• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java Functions类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中com.github.davidmoten.rx.Functions的典型用法代码示例。如果您正苦于以下问题:Java Functions类的具体用法?Java Functions怎么用?Java Functions使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Functions类属于com.github.davidmoten.rx包,在下文中一共展示了Functions类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: get

import com.github.davidmoten.rx.Functions; //导入依赖的package包/类
private static Observable<SqsMessage> get(AmazonSQSClient sqs, String queueName,
        Optional<String> bucketName, Optional<AmazonS3Client> s3, Service service,
        int waitTimeSeconds) {
    return Observable.defer(() -> {
        String queueUrl = sqs.getQueueUrl(queueName).getQueueUrl();
        return Observable
                .just(sqs.receiveMessage(request(queueName, waitTimeSeconds)) //
                        .getMessages() //
                        .stream() //
                        .map(m -> Sqs.getNextMessage(m, queueUrl, bucketName, s3, sqs, service)) //
                        .collect(Collectors.toList())) //
                .concatWith(Observable
                        .defer(() -> Observable.just(sqs.receiveMessage(request(queueName, 0)) //
                                .getMessages() //
                                .stream() //
                                .map(m -> Sqs.getNextMessage(m, queueUrl, bucketName, s3, sqs,
                                        service)) //
                        .collect(Collectors.toList()))) //
                        .repeat())
                .takeWhile(list -> !list.isEmpty()) //
                .flatMapIterable(Functions.identity()) //
                .filter(opt -> opt.isPresent()).map(opt -> opt.get());
    });//
}
 
开发者ID:davidmoten,项目名称:rxjava-aws,代码行数:25,代码来源:Sqs.java


示例2: createServerSocketObservable

import com.github.davidmoten.rx.Functions; //导入依赖的package包/类
private static Observable<Observable<byte[]>> createServerSocketObservable(
        ServerSocket serverSocket, final long timeoutMs, final int bufferSize,
        final Action0 preAcceptAction, final Func1<? super Socket, Boolean> acceptSocket) {
    return Observable.create( //
            SyncOnSubscribe.<ServerSocket, Observable<byte[]>> createSingleState( //
                    Functions.constant0(serverSocket), //
                    new Action2<ServerSocket, Observer<? super Observable<byte[]>>>() {

                        @Override
                        public void call(ServerSocket ss,
                                Observer<? super Observable<byte[]>> observer) {
                            acceptConnection(timeoutMs, bufferSize, ss, observer,
                                    preAcceptAction, acceptSocket);
                        }
                    }));
}
 
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:17,代码来源:ObservableServerSocket.java


示例3: testRetryWhenSpecificExceptionAllowedUsePredicateReturnsFalse

import com.github.davidmoten.rx.Functions; //导入依赖的package包/类
@Test
public void testRetryWhenSpecificExceptionAllowedUsePredicateReturnsFalse() {
    Exception ex = new IllegalArgumentException("boo");
    TestSubscriber<Integer> ts = TestSubscriber.create();
    TestScheduler scheduler = new TestScheduler();
    Func1<Throwable, Boolean> predicate = Functions.alwaysFalse();
    Observable.just(1, 2)
            // force error after 3 emissions
            .concatWith(Observable.<Integer> error(ex))
            // retry with backoff
            .retryWhen(
                    RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
                            .scheduler(scheduler).retryIf(predicate).build())
            // go
            .subscribe(ts);
    ts.assertValues(1, 2);
    ts.assertError(ex);
}
 
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:19,代码来源:RetryWhenTest.java


示例4: testStateTransitionThrowsError

import com.github.davidmoten.rx.Functions; //导入依赖的package包/类
@Test
public void testStateTransitionThrowsError() {
    final RuntimeException ex = new RuntimeException("boo");
    Func0<Integer> initialState = Functions.constant0(1);
    Func3<Integer, Integer, Observer<Integer>, Integer> transition = new Func3<Integer, Integer, Observer<Integer>, Integer>() {

        @Override
        public Integer call(Integer collection, Integer t, Observer<Integer> observer) {
            throw ex;
        }

    };
    Func2<Integer, Observer<Integer>, Boolean> completion = Functions.alwaysTrue2();
    Transformer<Integer, Integer> transformer = Transformers.stateMachine(initialState,
            transition, completion);
    TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
    Observable.just(1, 1, 1).compose(transformer).subscribe(ts);
    ts.awaitTerminalEvent();
    ts.assertError(ex);
}
 
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:21,代码来源:TransformerStateMachineTest.java


示例5: testUnsubscriptionFromTransition

import com.github.davidmoten.rx.Functions; //导入依赖的package包/类
@Test
public void testUnsubscriptionFromTransition() {
    Func0<Integer> initialState = Functions.constant0(1);
    Func3<Integer, Integer, Subscriber<Integer>, Integer> transition = new Func3<Integer, Integer, Subscriber<Integer>, Integer>() {

        @Override
        public Integer call(Integer collection, Integer t, Subscriber<Integer> subscriber) {
            subscriber.onNext(123);
            subscriber.unsubscribe();
            return 1;
        }

    };
    Func2<Integer, Observer<Integer>, Boolean> completion = Functions.alwaysTrue2();
    Transformer<Integer, Integer> transformer = Transformers.stateMachine(initialState,
            transition, completion);
    TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
    Observable.just(1, 1, 1).repeat().compose(transformer).subscribe(ts);
    ts.assertValue(123);
    ts.assertCompleted();
    ts.assertUnsubscribed();
}
 
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:23,代码来源:TransformerStateMachineTest.java


示例6: testForCompletionWithinStateMachine

import com.github.davidmoten.rx.Functions; //导入依赖的package包/类
@Test
public void testForCompletionWithinStateMachine() {
    Func0<Integer> initialState = Functions.constant0(1);
    Func3<Integer, Integer, Subscriber<Integer>, Integer> transition = new Func3<Integer, Integer, Subscriber<Integer>, Integer>() {

        @Override
        public Integer call(Integer collection, Integer t, Subscriber<Integer> subscriber) {
            subscriber.onNext(123);
            // complete from within transition
            subscriber.onCompleted();
            return 1;
        }

    };
    Func2<? super Integer, ? super Subscriber<Integer>, Boolean> completion = Functions
            .alwaysTrue2();
    Transformer<Integer, Integer> transformer = Transformers.stateMachine(initialState,
            transition, completion);
    TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
    final AtomicInteger count = new AtomicInteger(0);
    Observable.just(1, 2, 3).doOnNext(Actions.increment1(count)).compose(transformer)
            .subscribe(ts);
    ts.assertValues(123);
    ts.assertCompleted();
    assertEquals(1, count.get());
}
 
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:27,代码来源:TransformerStateMachineTest.java


示例7: testUnsubscribeFromAsynchronousSource

import com.github.davidmoten.rx.Functions; //导入依赖的package包/类
@Test
public void testUnsubscribeFromAsynchronousSource() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);
    Observable
            // every 100ms
            .interval(100, TimeUnit.MILLISECONDS)
            // detect unsubscribe
            .doOnUnsubscribe(countDown(latch))
            // use toOperator
            .lift(toOperator(Functions.<Observable<Long>> identity())).take(1).first()
            // block and get result
            .toBlocking().single();
    // wait for expected unsubscription
    assertTrue(latch.await(AWAIT_SECONDS, TimeUnit.SECONDS));

}
 
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:17,代码来源:OperatorFromTransformerTest.java


示例8: testUnsubscribeFromSynchronousSource

import com.github.davidmoten.rx.Functions; //导入依赖的package包/类
@Test
public void testUnsubscribeFromSynchronousSource() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);
    PublishSubject<Integer> subject = PublishSubject.create();
    subject
            // detect unsubscribe
            .doOnUnsubscribe(countDown(latch))
            // use toOperator
            .lift(toOperator(Functions.<Observable<Integer>> identity()))
            // get first only
            .take(1)
            // subscribe and ignore events
            .subscribe();
    subject.onNext(1);
    // should have unsubscribed because of take(1)
    assertTrue(latch.await(AWAIT_SECONDS, TimeUnit.SECONDS));
}
 
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:18,代码来源:OperatorFromTransformerTest.java


示例9: testBackpressure

import com.github.davidmoten.rx.Functions; //导入依赖的package包/类
@Test
public void testBackpressure() {
    TestSubscriber<Integer> ts = TestSubscriber.create(1);
    Observable.
            //
            range(1, 1000)
            // use toOperator
            .lift(toOperator(Functions.<Observable<Integer>> identity()))
            // block and get result
            .subscribe(ts);
    // make sure only one value has arrived
    ts.assertValueCount(1);
    ts.requestMore(2);
    ts.assertValueCount(3);
    ts.unsubscribe();
}
 
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:17,代码来源:OperatorFromTransformerTest.java


示例10: value

import com.github.davidmoten.rx.Functions; //导入依赖的package包/类
public Builder<T> value(final String prefix) {
    Func1<Func1<T, String>, Func1<T, String>> message = chainMessage(
            new Func2<Func1<T, String>, T, String>() {
                @Override
                public String call(Func1<T, String> f, T t) {
                    StringBuilder line = new StringBuilder();
                    line.append(prefix);
                    line.append(String.valueOf(t));
                    return line.toString();
                }

            });
    Func1<Action1<T>, Action1<T>> action = Functions.identity();
    transitions.add(new Transition<T>(action, message));
    return this;
}
 
开发者ID:davidmoten,项目名称:rxjava-slf4j,代码行数:17,代码来源:Log.java


示例11: sortFileFixes

import com.github.davidmoten.rx.Functions; //导入依赖的package包/类
private static Func1<File, Observable<Integer>> sortFileFixes(final long downSampleIntervalMs) {
    return file -> {
        return BinaryFixes.from(file)
                // to list
                .toList()
                // sort each list
                .map(sortFixes())
                // flatten
                .flatMapIterable(Functions.<List<Fix>> identity())
                // downsample the sorted fixes
                .compose(Downsample.minTimeStep(downSampleIntervalMs, TimeUnit.MILLISECONDS))
                .cast(HasFix.class)
                // make into a list again
                .toList()
                // replace the file with sorted fixes
                .doOnNext(writeFixes(file))
                // count the fixes
                .count();
    };
}
 
开发者ID:amsa-code,项目名称:risky,代码行数:21,代码来源:BinaryFixes.java


示例12: testRecursion

import com.github.davidmoten.rx.Functions; //导入依赖的package包/类
@Test
public void testRecursion() {
	final Func1<List<Integer>, Integer> total = new Func1<List<Integer>, Integer>() {

		@Override
		public Integer call(List<Integer> list) {
			int count = 0;
			for (int i : list)
				count += i;
			return count;
		}
	};
	Transformer<Integer, Integer> f = new Transformer<Integer, Integer>() {

		@Override
		public Observable<Integer> call(Observable<Integer> x) {
			return x.buffer(2).map(total);
		}
	};

	Observable<Integer> ones = Observable.just(1).repeat().take(100);
	System.out
			.println(ones.compose(f).compose(f).compose(f).compose(f)
					.compose(f).compose(f).compose(f).count().toBlocking()
					.single());

	int sum = ones
			.map(x -> Observable.just(x))
			.reduce(new Func2<Observable<Integer>, Observable<Integer>, Observable<Integer>>() {

				@Override
				public Observable<Integer> call(Observable<Integer> a,
						Observable<Integer> b) {
					return a.concatWith(b).toList().map(total);
				}
			}).flatMap(Functions.<Observable<Integer>> identity()).single()
			.toBlocking().single();
	assertEquals(100, sum);
}
 
开发者ID:davidmoten,项目名称:bigsort,代码行数:40,代码来源:OnSubscribeRefreshSelectTest.java


示例13: testAcceptSocketRejectsAlways

import com.github.davidmoten.rx.Functions; //导入依赖的package包/类
@Test
public void testAcceptSocketRejectsAlways()
        throws UnknownHostException, IOException, InterruptedException {
    reset();
    TestSubscriber<Object> ts = TestSubscriber.create();
    try {
        int bufferSize = 4;
        AtomicInteger port = new AtomicInteger();
        IO.serverSocketAutoAllocatePort(Actions.setAtomic(port)) //
                .readTimeoutMs(10000) //
                .acceptTimeoutMs(200) //
                .bufferSize(bufferSize) //
                .acceptSocketIf(Functions.alwaysFalse()) //
                .create() //
                .subscribeOn(scheduler) //
                .subscribe(ts);
        Thread.sleep(300);
        Socket socket = new Socket("localhost", port.get());
        OutputStream out = socket.getOutputStream();
        out.write("12345678901234567890".getBytes());
        out.close();
        socket.close();
        Thread.sleep(1000);
        ts.assertNoValues();
    } finally {
        // will close server socket
        ts.unsubscribe();
    }
}
 
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:30,代码来源:ObservableServerSocketTest.java


示例14: testKeyFunctionAThrowsResultsInErrorEmission

import com.github.davidmoten.rx.Functions; //导入依赖的package包/类
@Test
public void testKeyFunctionAThrowsResultsInErrorEmission() {
    Observable<Integer> a = Observable.just(1);
    Observable<Integer> b = Observable.just(1);
    Obs.match(a, b, Functions.throwing(), Functions.identity(), COMBINER)
            .to(TestingHelper.<Integer> test()).assertNoValues()
            .assertError(Functions.ThrowingException.class);
}
 
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:9,代码来源:OnSubscribeMatchTest.java


示例15: testKeyFunctionBThrowsResultsInErrorEmission

import com.github.davidmoten.rx.Functions; //导入依赖的package包/类
@Test
public void testKeyFunctionBThrowsResultsInErrorEmission() {
    Observable<Integer> a = Observable.just(1);
    Observable<Integer> b = Observable.just(1);
    Obs.match(a, b, Functions.identity(), Functions.throwing(), COMBINER)
            .to(TestingHelper.<Integer> test()) //
            .assertNoValues() //
            .assertError(Functions.ThrowingException.class);
}
 
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:10,代码来源:OnSubscribeMatchTest.java


示例16: testCombinerFunctionBThrowsResultsInErrorEmission

import com.github.davidmoten.rx.Functions; //导入依赖的package包/类
@Test
public void testCombinerFunctionBThrowsResultsInErrorEmission() {
    Observable<Integer> a = Observable.just(1, 2);
    Observable<Integer> b = Observable.just(2, 1);
    Obs.match(a, b, Functions.identity(), Functions.identity(),
            Functions.<Integer, Integer, Integer> throwing2())
            .to(TestingHelper.<Integer> test()) //
            .assertNoValues() //
            .assertError(Functions.ThrowingException.class);
}
 
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:11,代码来源:OnSubscribeMatchTest.java


示例17: testHandlesNulls

import com.github.davidmoten.rx.Functions; //导入依赖的package包/类
@Test
public void testHandlesNulls() {
    Observable<Integer> a = Observable.just(null, null);
    Observable<Integer> b = Observable.just(null, null);
    Obs.match(a, b, Functions.constant(1), Functions.constant(1), COMBINER)
            .to(TestingHelper.<Integer> test()) //
            .assertValues(null, null) //
            .assertCompleted();
}
 
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:10,代码来源:OnSubscribeMatchTest.java


示例18: testCombinerFunctionBThrowsResultsInErrorEmissionSwitched

import com.github.davidmoten.rx.Functions; //导入依赖的package包/类
@Test
public void testCombinerFunctionBThrowsResultsInErrorEmissionSwitched() {
    Observable<Integer> a = Observable.just(2, 1);
    Observable<Integer> b = Observable.just(1, 2);
    Obs.match(a, b, Functions.identity(), Functions.identity(),
            Functions.<Integer, Integer, Integer> throwing2())
            .to(TestingHelper.<Integer> test()).assertNoValues()
            .assertError(Functions.ThrowingException.class);
}
 
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:10,代码来源:OnSubscribeMatchTest.java


示例19: testMultipleNonSimultaeousSubscriptions

import com.github.davidmoten.rx.Functions; //导入依赖的package包/类
@Test
public void testMultipleNonSimultaeousSubscriptions() {
    Observable<Integer> sequence = Observable.range(1, 3)
            .lift(toOperator(Functions.<Observable<Integer>> identity()));
    assertEquals(asList(1, 2, 3), sequence.toList().toBlocking().single());
    assertEquals(asList(1, 2, 3), sequence.toList().toBlocking().single());
}
 
开发者ID:davidmoten,项目名称:rxjava-extras,代码行数:8,代码来源:OperatorFromTransformerTest.java


示例20: memory

import com.github.davidmoten.rx.Functions; //导入依赖的package包/类
public Builder<T> memory() {
    Func1<Func1<T, String>, Func1<T, String>> message = chainMessage(
            new Func2<Func1<T, String>, T, String>() {
                @Override
                public String call(Func1<T, String> f, T t) {
                    return memoryUsage();
                }
            });
    Func1<Action1<T>, Action1<T>> action = Functions.identity();
    transitions.add(new Transition<T>(action, message));
    return this;
}
 
开发者ID:davidmoten,项目名称:rxjava-slf4j,代码行数:13,代码来源:Log.java



注:本文中的com.github.davidmoten.rx.Functions类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java SoundReleasedException类代码示例发布时间:2022-05-22
下一篇:
Java Pair类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap