本文整理汇总了Java中reactor.core.Disposable类的典型用法代码示例。如果您正苦于以下问题:Java Disposable类的具体用法?Java Disposable怎么用?Java Disposable使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Disposable类属于reactor.core包,在下文中一共展示了Disposable类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: requestPressure
import reactor.core.Disposable; //导入依赖的package包/类
@Override
public Mono<NumberProto.Number> requestPressure(Flux<NumberProto.Number> request) {
if (explicitCancel.get()) {
// Process a very long sequence
Disposable subscription = request.subscribe(n -> System.out.println("S: " + n.getNumber(0)));
return Mono
.just(protoNum(-1))
.delayElement(Duration.ofMillis(250))
// Explicitly cancel by disposing the subscription
.doOnSuccess(x -> subscription.dispose());
} else {
// Process some of a very long sequence and cancel implicitly with a take(10)
return request.map(req -> req.getNumber(0))
.doOnNext(System.out::println)
.take(10)
.last(-1)
.map(CancellationPropagationIntegrationTest::protoNum);
}
}
开发者ID:salesforce,项目名称:reactive-grpc,代码行数:20,代码来源:CancellationPropagationIntegrationTest.java
示例2: clientCanCancelServerStreamExplicitly
import reactor.core.Disposable; //导入依赖的package包/类
@Test
public void clientCanCancelServerStreamExplicitly() throws InterruptedException {
AtomicInteger lastNumberConsumed = new AtomicInteger(Integer.MAX_VALUE);
ReactorNumbersGrpc.ReactorNumbersStub stub = ReactorNumbersGrpc.newReactorStub(channel);
Flux<NumberProto.Number> test = stub
.responsePressure(Mono.just(Empty.getDefaultInstance()))
.doOnNext(number -> {lastNumberConsumed.set(number.getNumber(0)); System.out.println("C: " + number.getNumber(0));})
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.doOnComplete(() -> System.out.println("Completed"))
.doOnCancel(() -> System.out.println("Client canceled"));
Disposable subscription = test.publish().connect();
Thread.sleep(1000);
subscription.dispose();
Thread.sleep(1000);
// Cancellation may or may not deliver the last generated message due to delays in the gRPC processing thread
assertThat(Math.abs(lastNumberConsumed.get() - svc.getLastNumberProduced())).isLessThanOrEqualTo(3);
assertThat(svc.wasCanceled()).isTrue();
}
开发者ID:salesforce,项目名称:reactive-grpc,代码行数:22,代码来源:CancellationPropagationIntegrationTest.java
示例3: clientCanCancelServerStreamImplicitly
import reactor.core.Disposable; //导入依赖的package包/类
@Test
public void clientCanCancelServerStreamImplicitly() throws InterruptedException {
ReactorNumbersGrpc.ReactorNumbersStub stub = ReactorNumbersGrpc.newReactorStub(channel);
Flux<NumberProto.Number> test = stub
.responsePressure(Mono.just(Empty.getDefaultInstance()))
.doOnNext(number -> System.out.println(number.getNumber(0)))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.doOnComplete(() -> System.out.println("Completed"))
.doOnCancel(() -> System.out.println("Client canceled"))
.take(10);
Disposable subscription = test.publish().connect();
Thread.sleep(1000);
assertThat(svc.wasCanceled()).isTrue();
}
开发者ID:salesforce,项目名称:reactive-grpc,代码行数:18,代码来源:CancellationPropagationIntegrationTest.java
示例4: main
import reactor.core.Disposable; //导入依赖的package包/类
public static void main(String[] args) throws IOException {
Flux<String> flux = client.get() //
.uri("/sse/messages") //
.retrieve() //
.bodyToFlux(String.class) //
.doOnNext(System.out::println);
System.out.println("Subscribing to SSE");
Disposable subscription = flux.subscribe();
System.out.println("Press any key to terminate subscription...");
System.in.read();
subscription.dispose();
}
开发者ID:mp911de,项目名称:reactive-spring,代码行数:17,代码来源:WorkshopSseClient.java
示例5: coordinatorReachableThroughCacheInnerSubscriptionsOnly
import reactor.core.Disposable; //导入依赖的package包/类
@Test
public void coordinatorReachableThroughCacheInnerSubscriptionsOnly() throws InterruptedException {
TestPublisher<Integer> source = TestPublisher.create();
MonoCacheTime<Integer> cached = new MonoCacheTime<>(source.mono(),
Duration.ofMillis(100), //short cache TTL should trigger state change if source is not never
Schedulers.parallel());
Disposable d1 = cached.subscribe();
cached.subscribe();
WeakReference<Signal<Integer>> refCoordinator = new WeakReference<>(cached.state);
assertThat(refCoordinator.get()).isInstanceOf(MonoCacheTime.CoordinatorSubscriber.class);
Thread.sleep(150);
source = null;
cached = null;
System.gc();
assertThat(refCoordinator.get()).isInstanceOf(MonoCacheTime.CoordinatorSubscriber.class);
}
开发者ID:reactor,项目名称:reactor-core,代码行数:23,代码来源:MonoCacheTimeTest.java
示例6: onCancel
import reactor.core.Disposable; //导入依赖的package包/类
@Override
public MonoSink<T> onCancel(Disposable d) {
Objects.requireNonNull(d, "onCancel");
SinkDisposable sd = new SinkDisposable(null, d);
if (!DISPOSABLE.compareAndSet(this, null, sd)) {
Disposable c = disposable;
if (c instanceof SinkDisposable) {
SinkDisposable current = (SinkDisposable) c;
if (current.onCancel == null) {
current.onCancel = d;
}
else {
d.dispose();
}
}
}
return this;
}
开发者ID:reactor,项目名称:reactor-core,代码行数:19,代码来源:MonoCreate.java
示例7: set
import reactor.core.Disposable; //导入依赖的package包/类
/**
* Atomically push the field to a {@link Disposable} and dispose the old content.
*
* @param updater the target field updater
* @param holder the target instance holding the field
* @param newValue the new Disposable to push
* @return true if successful, false if the field contains the {@link #DISPOSED} instance.
*/
public static <T> boolean set(AtomicReferenceFieldUpdater<T, Disposable> updater, T holder, @Nullable Disposable newValue) {
for (;;) {
Disposable current = updater.get(holder);
if (current == DISPOSED) {
if (newValue != null) {
newValue.dispose();
}
return false;
}
if (updater.compareAndSet(holder, current, newValue)) {
if (current != null) {
current.dispose();
}
return true;
}
}
}
开发者ID:reactor,项目名称:reactor-core,代码行数:26,代码来源:OperatorDisposables.java
示例8: request
import reactor.core.Disposable; //导入依赖的package包/类
@Override
public void request(long n) {
if (Operators.validate(n)) {
if (ONCE.compareAndSet(this, 0, 1)) {
try {
Disposable f = scheduler.schedule(this);
if (!FUTURE.compareAndSet(this,
null,
f) && future != FINISHED && future != OperatorDisposables.DISPOSED) {
f.dispose();
}
}
catch (RejectedExecutionException ree) {
if (future != FINISHED && future != OperatorDisposables.DISPOSED) {
actual.onError(Operators.onRejectedExecution(ree,
this,
null,
value, actual.currentContext()));
}
}
}
}
}
开发者ID:reactor,项目名称:reactor-core,代码行数:24,代码来源:FluxSubscribeOnValue.java
示例9: coordinatorCacheInnerDisposedOrNoReferenceNoLeak
import reactor.core.Disposable; //导入依赖的package包/类
@Test
public void coordinatorCacheInnerDisposedOrNoReferenceNoLeak() throws InterruptedException {
TestPublisher<Integer> source = TestPublisher.create();
MonoCacheTime<Integer> cached = new MonoCacheTime<>(source.mono(),
Duration.ofMillis(100), //short cache TTL should trigger state change if source is not never
Schedulers.parallel());
Disposable d1 = cached.subscribe();
cached.subscribe();
WeakReference<Signal<Integer>> refCoordinator = new WeakReference<>(cached.state);
assertThat(refCoordinator.get()).isInstanceOf(MonoCacheTime.CoordinatorSubscriber.class);
Thread.sleep(150);
source = null;
cached = null;
d1.dispose();
System.gc();
assertThat(refCoordinator.get()).isNull();
}
开发者ID:reactor,项目名称:reactor-core,代码行数:24,代码来源:MonoCacheTimeTest.java
示例10: subscribersComeAndGoBelowThreshold
import reactor.core.Disposable; //导入依赖的package包/类
@Test
public void subscribersComeAndGoBelowThreshold() {
Flux<Integer> p = Flux.range(1, 5).publish().refCount(2);
Disposable r = p.subscribe();
r.dispose();
p.subscribe().dispose();
p.subscribe().dispose();
p.subscribe().dispose();
p.subscribe().dispose();
AssertSubscriber<Integer> ts1 = AssertSubscriber.create();
p.subscribe(ts1);
ts1.assertValueCount(0);
AssertSubscriber<Integer> ts2 = AssertSubscriber.create();
p.subscribe(ts2);
ts1.assertValues(1, 2, 3, 4, 5);
ts2.assertValues(1, 2, 3, 4, 5);
}
开发者ID:reactor,项目名称:reactor-core,代码行数:23,代码来源:FluxRefCountTest.java
示例11: workerSchedulePeriodically
import reactor.core.Disposable; //导入依赖的package包/类
static Disposable workerSchedulePeriodically(ScheduledExecutorService exec,
Disposable.Composite tasks,
Runnable task,
long initialDelay,
long period,
TimeUnit unit) {
PeriodicWorkerTask sr = new PeriodicWorkerTask(task, tasks);
if (!tasks.add(sr)) {
throw Exceptions.failWithRejected();
}
try {
Future<?> f = exec.scheduleAtFixedRate(sr, initialDelay, period, unit);
sr.setFuture(f);
}
catch (RejectedExecutionException ex) {
sr.dispose();
//RejectedExecutionException are propagated up
throw ex;
}
return sr;
}
开发者ID:reactor,项目名称:reactor-core,代码行数:25,代码来源:Schedulers.java
示例12: subscribersComeAndGoBelowThreshold
import reactor.core.Disposable; //导入依赖的package包/类
@Test
public void subscribersComeAndGoBelowThreshold() {
Flux<Integer> p = Flux.range(1, 5).publish().refCount(2, Duration.ofMillis(500));
Disposable r = p.subscribe();
r.dispose();
p.subscribe().dispose();
p.subscribe().dispose();
p.subscribe().dispose();
p.subscribe().dispose();
AssertSubscriber<Integer> ts1 = AssertSubscriber.create();
p.subscribe(ts1);
ts1.assertValueCount(0);
AssertSubscriber<Integer> ts2 = AssertSubscriber.create();
p.subscribe(ts2);
ts1.assertValues(1, 2, 3, 4, 5);
ts2.assertValues(1, 2, 3, 4, 5);
}
开发者ID:reactor,项目名称:reactor-core,代码行数:23,代码来源:FluxRefCountGraceTest.java
示例13: schedule
import reactor.core.Disposable; //导入依赖的package包/类
@Override
public Disposable schedule(Runnable task) {
if(terminated){
throw Exceptions.failWithRejected();
}
Objects.requireNonNull(task, "task");
ExecutorPlainRunnable r = new ExecutorPlainRunnable(task);
//RejectedExecutionException are propagated up, but since Executor doesn't from
//failing tasks we'll also wrap the execute call in a try catch:
try {
executor.execute(r);
}
catch (Throwable ex) {
terminated = true;
Schedulers.handleError(ex);
throw Exceptions.failWithRejected(ex);
}
return r;
}
开发者ID:reactor,项目名称:reactor-core,代码行数:20,代码来源:ExecutorScheduler.java
示例14: createWorker
import reactor.core.Disposable; //导入依赖的package包/类
@Override public Worker createWorker() {
return new Worker() {
private final List<Runnable> workerTasks = new ArrayList<>();
@Override public Disposable schedule(Runnable task) {
workerTasks.add(task);
tasks.add(task);
return () -> tasks.remove(task);
}
@Override public void dispose() {
tasks.removeAll(workerTasks);
}
};
}
开发者ID:JakeWharton,项目名称:retrofit2-reactor-adapter,代码行数:16,代码来源:TestScheduler.java
示例15: cancelReceiver
import reactor.core.Disposable; //导入依赖的package包/类
final boolean cancelReceiver() {
Disposable c = receiverCancel;
if (c != CANCELLED) {
c = CANCEL.getAndSet(this, CANCELLED);
if (c != CANCELLED) {
c.dispose();
return true;
}
}
return false;
}
开发者ID:reactor,项目名称:reactor-netty,代码行数:12,代码来源:FluxReceive.java
示例16: schedule
import reactor.core.Disposable; //导入依赖的package包/类
@Override
public Disposable schedule(Runnable task, long delay, TimeUnit unit) {
if (shutdown) {
throw Exceptions.failWithRejected();
}
return directWorker.schedule(task, delay, unit);
}
开发者ID:reactor,项目名称:reactor-core,代码行数:8,代码来源:VirtualTimeScheduler.java
示例17: subscriptionCancelNullifiesActual
import reactor.core.Disposable; //导入依赖的package包/类
@Test
public void subscriptionCancelNullifiesActual() {
UnicastProcessor<String> processor = UnicastProcessor.create();
assertThat(processor.downstreamCount())
.as("before subscribe")
.isZero();
LambdaSubscriber<String> subscriber = new LambdaSubscriber<>(null, null, null, null);
Disposable subscription = processor.subscribeWith(subscriber);
assertThat(processor.downstreamCount())
.as("after subscribe")
.isEqualTo(1);
assertThat(processor.actual())
.as("after subscribe")
.isSameAs(subscriber);
subscription.dispose();
assertThat(processor.downstreamCount())
.as("after subscription cancel")
.isZero();
assertThat(processor.actual())
.as("after subscription cancel")
.isNull();
}
开发者ID:reactor,项目名称:reactor-core,代码行数:28,代码来源:UnicastProcessorTest.java
示例18: replace
import reactor.core.Disposable; //导入依赖的package包/类
static boolean replace(AtomicReference<Disposable> ref, @Nullable Disposable c) {
for (; ; ) {
Disposable current = ref.get();
if (current == CANCELLED) {
if (c != null) {
c.dispose();
}
return false;
}
if (ref.compareAndSet(current, c)) {
return true;
}
}
}
开发者ID:reactor,项目名称:reactor-core,代码行数:15,代码来源:VirtualTimeScheduler.java
示例19: wasCancelledMono
import reactor.core.Disposable; //导入依赖的package包/类
@Test
public void wasCancelledMono() {
PublisherProbe<Void> probe = PublisherProbe.of(Mono.never());
Disposable d = probe.mono().subscribe();
assertThat(probe.wasCancelled()).isFalse();
d.dispose();
assertThat(probe.wasCancelled()).isTrue();
}
开发者ID:reactor,项目名称:reactor-core,代码行数:12,代码来源:PublisherProbeTest.java
示例20: schedule
import reactor.core.Disposable; //导入依赖的package包/类
@Override
public Disposable schedule(Runnable task) {
CachedService cached = pick();
return Schedulers.directSchedule(cached.exec,
new DirectScheduleTask(task, cached),
0L,
TimeUnit.MILLISECONDS);
}
开发者ID:reactor,项目名称:reactor-core,代码行数:10,代码来源:ElasticScheduler.java
注:本文中的reactor.core.Disposable类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论