• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java Streams类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中reactor.rx.Streams的典型用法代码示例。如果您正苦于以下问题:Java Streams类的具体用法?Java Streams怎么用?Java Streams使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Streams类属于reactor.rx包,在下文中一共展示了Streams类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: equals

import reactor.rx.Streams; //导入依赖的package包/类
@Override
public boolean equals(Object o) {
    if (o == this)
        return true;
    if (!(o instanceof RedissonListReactive))
        return false;

    Stream<Object> e1 = Streams.wrap((Publisher<Object>)iterator());
    Stream<Object> e2 = Streams.wrap(((RedissonListReactive<Object>) o).iterator());
    Long count = Streams.merge(e1, e2).groupBy(new Function<Object, Object>() {
        @Override
        public Object apply(Object t) {
            return t;
        }
    }).count().next().poll();

    boolean res = count.intValue() == Streams.wrap(size()).next().poll();
    res &= count.intValue() == Streams.wrap(((RedissonListReactive<Object>) o).size()).next().poll();
    return res;
}
 
开发者ID:qq1588518,项目名称:JRediClients,代码行数:21,代码来源:RedissonListReactive.java


示例2: mapFunction

import reactor.rx.Streams; //导入依赖的package包/类
private Function<Entry<K, V>, Boolean> mapFunction(final RMapReactive<Object, Object> m) {
    return new Function<Map.Entry<K, V>, Boolean>() {
        @Override
        public Boolean apply(Entry<K, V> e) {
            Object key = e.getKey();
            Object value = e.getValue();
            if (value == null) {
                if (!(Streams.create(m.get(key)).next().poll() ==null && Streams.create(m.containsKey(key)).next().poll()))
                    return false;
            } else {
                if (!value.equals(Streams.create(m.get(key)).next().poll()))
                    return false;
            }
            return true;
        }
    };
}
 
开发者ID:qq1588518,项目名称:JRediClients,代码行数:18,代码来源:RedissonMapReactive.java


示例3: echoJsonStreamDecoding

import reactor.rx.Streams; //导入依赖的package包/类
private static void echoJsonStreamDecoding() {

        TcpServer<Person, Person> transport = Netty4TcpServer.<Person, Person>create(
                0,
                new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                        channel.pipeline().addFirst(
                                new JsonObjectDecoder(),
                                new JacksonJsonCodec());
                    }
                });

        ReactorTcpServer.create(transport)
                .start(connection -> {
                    connection.log("input")
                            .observeComplete(v -> LOG.info("Connection input complete"))
                            .capacity(1)
                            .consume(person -> {
                                person = new Person(person.getLastName(), person.getFirstName());
                                Streams.wrap(connection.writeWith(Streams.just(person))).consume();
                            });
                    return Streams.never();
                });

    }
 
开发者ID:reactive-ipc,项目名称:reactive-ipc-jvm,代码行数:27,代码来源:CodecSample.java


示例4: getHandler

import reactor.rx.Streams; //导入依赖的package包/类
private ReactorChannelHandler<String, String, HttpChannel<String, String>> getHandler() {
	return channel -> {
		channel.headers()
				.entries()
				.forEach(
						entry1 -> System.out.println(String.format(
								"header [%s=>%s]", entry1.getKey(),
								entry1.getValue())));
		String response;
		try {
			response = getWebPage("src/main/java/webapp/ws.html");
		} catch (IOException e) {
			e.printStackTrace();
			response = e.getMessage();
		}
		System.out.println(String.format("%s from thread %s",
				response.toString(), Thread.currentThread()));
		return channel.writeWith(Streams.just(response
				.toString()));
	};
}
 
开发者ID:iproduct,项目名称:low-latency-high-throughput,代码行数:22,代码来源:ReactorWishesWS.java


示例5: postHandler

import reactor.rx.Streams; //导入依赖的package包/类
private ReactorChannelHandler<Buffer, Buffer, HttpChannel<Buffer, Buffer>> postHandler() {
	return channel -> {

		channel.headers().entries().forEach(entry -> System.out.println(String.format("header [%s=>%s]", entry.getKey(),
				entry.getValue())));

		return channel.writeWith(Streams
				.wrap(channel)
				.take(1)
				.log("received")
				.flatMap(data -> {
					final StringBuilder response = new StringBuilder().append("hello ").append(new String(data.asBytes()));
					System.out.println(String.format("%s from thread %s", response.toString(), Thread.currentThread()));
					return Streams.just(Buffer.wrap(response.toString()));
				}));
	};
}
 
开发者ID:iproduct,项目名称:low-latency-high-throughput,代码行数:18,代码来源:ReactorNetGetPost.java


示例6: main

import reactor.rx.Streams; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException {
	Environment env = Environment.initialize();
	CountDownLatch latch = new CountDownLatch(5);

	Dispatcher trreadPoolDispatcher = new ThreadPoolExecutorDispatcher(5, 128);

	Consumer<String> consumer = ev -> {
	    LOG.info("Hello " + ev + " from thread: " +Thread.currentThread() + "\n");
	    latch.countDown(); 
	};

	Consumer<Throwable> errorConsumer = error ->
	    error.printStackTrace();

	// a task is submitted to the thread pool dispatcher
	Stream<String> stream = Streams.just("One", "Two", "Three", "Four", "Five", "Six", "Seven");
	stream.dispatchOn(env).consume(ev -> {
				System.out.println(ev);
				trreadPoolDispatcher.dispatch(ev, consumer, errorConsumer);
			});

	latch.await(15, TimeUnit.SECONDS); // Wait for task to execute

}
 
开发者ID:iproduct,项目名称:low-latency-high-throughput,代码行数:25,代码来源:ThreaadPoolExecutorDispatcher.java


示例7: main

import reactor.rx.Streams; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException {
	Environment env = Environment.initialize();
	CountDownLatch latch = new CountDownLatch(5);

	DispatcherSupplier supplier = Environment.newCachedDispatchers(3, "myPool");
	
	Consumer<String> consumer = ev -> {
	    LOG.info("Hello " + ev + " from thread: " +Thread.currentThread() + "\n");
	    latch.countDown(); 
	};

	Consumer<Throwable> errorConsumer = error ->
	    error.printStackTrace();

	// a task is submitted to the thread pool dispatcher
	Stream<String> stream = Streams.just("One", "Two", "Three", "Four", "Five", "Six", "Seven");
	stream.dispatchOn(env).partition().consume(
		groupStream -> 
			groupStream.dispatchOn(supplier.get()).consume(consumer, errorConsumer)
	);
	latch.await(15, TimeUnit.SECONDS); // Wait for task to execute

}
 
开发者ID:iproduct,项目名称:low-latency-high-throughput,代码行数:24,代码来源:DispatcherSupplierPartitioning.java


示例8: main

import reactor.rx.Streams; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException {
	Environment env = Environment.initialize();

	Processor<String, String> p = RingBufferProcessor.create("testProcessor", 32); 
	Stream<String> s1 = Streams.wrap(p); 

	s1.consume(ev -> System.out.println(Thread.currentThread() + " data=" + ev)); 
	s1.consume(ev -> System.out.println(Thread.currentThread() + " data=" + ev)); 
	s1.consume(ev -> System.out.println(Thread.currentThread() + " data=" + ev)); 
	
	p.onNext("One");
	p.onNext("Two");
	p.onNext("Three");
	p.onComplete();
	
	Environment.terminate();

}
 
开发者ID:iproduct,项目名称:low-latency-high-throughput,代码行数:19,代码来源:RingBufferProcessorDemo.java


示例9: main

import reactor.rx.Streams; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException {
	Environment env = Environment.initialize();

	Processor<String, String> p = RingBufferWorkProcessor.create("testProcessor", 32); 
	Stream<String> s1 = Streams.wrap(p); 

	s1.consume(ev -> System.out.println(Thread.currentThread() + " data=" + ev)); 
	s1.consume(ev -> System.out.println(Thread.currentThread() + " data=" + ev)); 
	s1.consume(ev -> System.out.println(Thread.currentThread() + " data=" + ev)); 
	
	p.onNext("One");
	p.onNext("Two");
	p.onNext("Three");
	p.onNext("Four");
	p.onNext("Five");
	p.onComplete();
	
	Environment.terminate();

}
 
开发者ID:iproduct,项目名称:low-latency-high-throughput,代码行数:21,代码来源:RingBufferWorkProcessorDemo.java


示例10: getHandler

import reactor.rx.Streams; //导入依赖的package包/类
private ReactorChannelHandler<String, String, HttpChannel<String, String>> getHandler() {
		return channel -> {
//			channel.headers()
//					.entries()
//					.forEach(
//							entry1 -> System.out.println(String.format(
//									"header [%s=>%s]", entry1.getKey(),
//									entry1.getValue())));
//			System.out.println(channel.uri());
			String uri = channel.uri();
			if (uri.equals("/")) 
				uri = "/index.html";
			String path = "src/main/webapp" + uri;	
			
			String response;
			try {
				response = getStaticResource(path);
			} catch (IOException e) {
				e.printStackTrace();
				response = e.getMessage();
			}
		
			return channel.writeWith(Streams.just(response
					.toString()));
		};
	}
 
开发者ID:iproduct,项目名称:low-latency-high-throughput,代码行数:27,代码来源:ReactorWishesWS.java


示例11: process

import reactor.rx.Streams; //导入依赖的package包/类
@Override
public Stream<Tuple> process(Stream<String> stream) {


	return stream.flatMap(tweet -> {
		JSONArray array = JsonPath.read(tweet, "$.entities.hashtags[*].text");
		return Streams.from(array.toArray(new String[array.size()]));
	})
			.map(w -> reactor.fn.tuple.Tuple.of(w, 1))
			.window(timeWindow, SECONDS)
			.flatMap(s -> BiStreams.reduceByKey(s, (acc, next) -> acc + next)
					.sort((a, b) -> -a.t2.compareTo(b.t2))
					.take(topN))
			.map(entry -> tuple().of("hashtag", entry.t1, "count", entry.t2));

}
 
开发者ID:spring-projects,项目名称:spring-xd-samples,代码行数:17,代码来源:TopTags.java


示例12: getKeysByPattern

import reactor.rx.Streams; //导入依赖的package包/类
@Override
public Publisher<String> getKeysByPattern(final String pattern) {
    List<Publisher<String>> publishers = new ArrayList<Publisher<String>>();
    for (MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) {
        publishers.add(createKeysIterator(entry, pattern));
    }
    return Streams.merge(publishers);
}
 
开发者ID:qq1588518,项目名称:JRediClients,代码行数:9,代码来源:RedissonKeysReactive.java


示例13: sync

import reactor.rx.Streams; //导入依赖的package包/类
public static <V> V sync(Publisher<V> ob) {
    Promise<V> promise;
    if (Promise.class.isAssignableFrom(ob.getClass())) {
        promise = (Promise<V>) ob;
    } else {
        promise = Streams.wrap(ob).next();
    }

    V val = promise.poll();
    if (promise.isError()) {
        throw new RuntimeException(promise.reason());
    }
    return val;
}
 
开发者ID:qq1588518,项目名称:JRediClients,代码行数:15,代码来源:BaseReactiveTest.java


示例14: createProcessor

import reactor.rx.Streams; //导入依赖的package包/类
@PostConstruct
private void createProcessor() {
    processor = RingBufferProcessor.create();
    Stream stream = Streams.wrap(processor);
    stream.buffer(1, TimeUnit.SECONDS).consume(new Consumer<List<Long>>() {
        @Override
        public void accept(List<Long> repositoryIds) {
            for (Long repositoryId : Sets.newHashSet(repositoryIds)) {
                setRepositoryStatsOutOfDate(repositoryId);
            }
        }
    });
}
 
开发者ID:box,项目名称:mojito,代码行数:14,代码来源:RepositoryStatisticsUpdatedReactor.java


示例15: writeMultipleValues

import reactor.rx.Streams; //导入依赖的package包/类
@Test
public void writeMultipleValues() throws IOException {
    Promise<ByteBuf> chunk1 = Promises.success(Unpooled.buffer().writeBytes("This is".getBytes()));
    Promise<ByteBuf> chunk2 = Promises.success(Unpooled.buffer().writeBytes(" a test!".getBytes()));
    reactorServer.start(connection -> connection.writeWith(Streams.concat(chunk1, chunk2)));
    assertEquals("This is a test!", SocketTestUtils.read("localhost", reactorServer.getPort()));
}
 
开发者ID:reactive-ipc,项目名称:reactive-ipc-jvm,代码行数:8,代码来源:ReactorTcpServerTests.java


示例16: echo

import reactor.rx.Streams; //导入依赖的package包/类
/**
 * Keep echoing until the client goes away.
 */
private static void echo(TcpServer<ByteBuf, ByteBuf> transport) {
    ReactorTcpServer.create(transport)
        .startAndAwait(connection -> {
            connection.flatMap(inByteBuf -> {
                String text = "Hello " + inByteBuf.toString(Charset.defaultCharset());
                ByteBuf outByteBuf = Unpooled.buffer().writeBytes(text.getBytes());
                return connection.writeWith(Streams.just(outByteBuf));
            }).consume();
            return Streams.never();
        });
}
 
开发者ID:reactive-ipc,项目名称:reactive-ipc-jvm,代码行数:15,代码来源:ReactorTcpServerSample.java


示例17: echoWithQuitCommand

import reactor.rx.Streams; //导入依赖的package包/类
/**
 * Keep echoing until the client sends "quit".
 */
private static void echoWithQuitCommand(TcpServer<ByteBuf, ByteBuf> transport) {
    ReactorTcpServer.create(transport)
        .start(connection -> connection
                .map(byteBuf -> byteBuf.toString(Charset.defaultCharset()))
                .takeWhile(input -> !"quit".equalsIgnoreCase(input.trim()))
                .filter(input -> !"quit".equalsIgnoreCase(input.trim()))
                .map(input -> "Hello " + input)
                .flatMap(text -> connection.writeWith(
                        Streams.just(Unpooled.buffer().writeBytes(text.getBytes()))
                    )
                )
                .after()
        );
}
 
开发者ID:reactive-ipc,项目名称:reactive-ipc-jvm,代码行数:18,代码来源:ReactorTcpServerSample.java


示例18: runLineBasedFrameDecoder

import reactor.rx.Streams; //导入依赖的package包/类
private static void runLineBasedFrameDecoder() {

        TcpServer<String, String> transport = Netty4TcpServer.<String, String>create(
                0,
                new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                        int bufferSize = 1;
                        ChannelConfig config = channel.config();
                        config.setOption(ChannelOption.SO_RCVBUF, bufferSize);
                        config.setOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(bufferSize));
                        channel.pipeline().addFirst(
                                new LineBasedFrameDecoder(256),
                                new StringDecoder(CharsetUtil.UTF_8),
                                new StringEncoder(CharsetUtil.UTF_8));
                    }
                });

        ReactorTcpServer.create(transport).start(connection -> {
            connection.log("input")
                    .observeComplete(v -> LOG.info("Connection input complete"))
                    .capacity(1)
                    .consume(line -> {
                        String response = "Hello " + line + "\n";
                        Streams.wrap(connection.writeWith(Streams.just(response))).consume();
                    });
            return Streams.never();
        });
    }
 
开发者ID:reactive-ipc,项目名称:reactive-ipc-jvm,代码行数:30,代码来源:CodecSample.java


示例19: TaxiStream

import reactor.rx.Streams; //导入依赖的package包/类
public TaxiStream(String fileName) {
    this.fileName = fileName;

    // create a Processor with an internal RingBuffer capacity of 32 slots
    this.tripsProcessor = RingBufferProcessor.create("trips", 32);

    // create a Reactor Stream from this Reactive Streams Processor
    this.trips = Streams.wrap(this.tripsProcessor);
}
 
开发者ID:jeryini,项目名称:TaxiManagementSystem,代码行数:10,代码来源:TaxiStream.java


示例20: main

import reactor.rx.Streams; //导入依赖的package包/类
public static void main(String... args) throws InterruptedException {
  Environment.initialize(); 

  Streams.zip(
	Streams.range(0, 3),
	Streams.from(new String[]{"Hello", "from", "Reactor", "Websocket"}),
	t2 -> t2).throttle(3000)
.consume( t2 -> System.out.println(t2.getT1() + ": " + t2.getT2()));
	
  Thread.sleep(500); 
}
 
开发者ID:iproduct,项目名称:low-latency-high-throughput,代码行数:12,代码来源:ZipStreamsDemo.java



注:本文中的reactor.rx.Streams类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java HttpUrlConnectorProvider类代码示例发布时间:2022-05-22
下一篇:
Java IndentingXMLStreamWriter类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap