• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java Disposable类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中reactor.core.Disposable的典型用法代码示例。如果您正苦于以下问题:Java Disposable类的具体用法?Java Disposable怎么用?Java Disposable使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Disposable类属于reactor.core包,在下文中一共展示了Disposable类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: requestPressure

import reactor.core.Disposable; //导入依赖的package包/类
@Override
public Mono<NumberProto.Number> requestPressure(Flux<NumberProto.Number> request) {
    if (explicitCancel.get()) {
        // Process a very long sequence
        Disposable subscription = request.subscribe(n -> System.out.println("S: " + n.getNumber(0)));
        return Mono
                .just(protoNum(-1))
                .delayElement(Duration.ofMillis(250))
                // Explicitly cancel by disposing the subscription
                .doOnSuccess(x -> subscription.dispose());
    } else {
        // Process some of a very long sequence and cancel implicitly with a take(10)
        return request.map(req -> req.getNumber(0))
                .doOnNext(System.out::println)
                .take(10)
                .last(-1)
                .map(CancellationPropagationIntegrationTest::protoNum);
    }
}
 
开发者ID:salesforce,项目名称:reactive-grpc,代码行数:20,代码来源:CancellationPropagationIntegrationTest.java


示例2: clientCanCancelServerStreamExplicitly

import reactor.core.Disposable; //导入依赖的package包/类
@Test
public void clientCanCancelServerStreamExplicitly() throws InterruptedException {
    AtomicInteger lastNumberConsumed = new AtomicInteger(Integer.MAX_VALUE);
    ReactorNumbersGrpc.ReactorNumbersStub stub = ReactorNumbersGrpc.newReactorStub(channel);
    Flux<NumberProto.Number> test = stub
            .responsePressure(Mono.just(Empty.getDefaultInstance()))
            .doOnNext(number -> {lastNumberConsumed.set(number.getNumber(0)); System.out.println("C: " + number.getNumber(0));})
            .doOnError(throwable -> System.out.println(throwable.getMessage()))
            .doOnComplete(() -> System.out.println("Completed"))
            .doOnCancel(() -> System.out.println("Client canceled"));

    Disposable subscription = test.publish().connect();

    Thread.sleep(1000);
    subscription.dispose();
    Thread.sleep(1000);

    // Cancellation may or may not deliver the last generated message due to delays in the gRPC processing thread
    assertThat(Math.abs(lastNumberConsumed.get() - svc.getLastNumberProduced())).isLessThanOrEqualTo(3);
    assertThat(svc.wasCanceled()).isTrue();
}
 
开发者ID:salesforce,项目名称:reactive-grpc,代码行数:22,代码来源:CancellationPropagationIntegrationTest.java


示例3: clientCanCancelServerStreamImplicitly

import reactor.core.Disposable; //导入依赖的package包/类
@Test
public void clientCanCancelServerStreamImplicitly() throws InterruptedException {
    ReactorNumbersGrpc.ReactorNumbersStub stub = ReactorNumbersGrpc.newReactorStub(channel);
    Flux<NumberProto.Number> test = stub
            .responsePressure(Mono.just(Empty.getDefaultInstance()))
            .doOnNext(number -> System.out.println(number.getNumber(0)))
            .doOnError(throwable -> System.out.println(throwable.getMessage()))
            .doOnComplete(() -> System.out.println("Completed"))
            .doOnCancel(() -> System.out.println("Client canceled"))
            .take(10);

    Disposable subscription = test.publish().connect();

    Thread.sleep(1000);

    assertThat(svc.wasCanceled()).isTrue();
}
 
开发者ID:salesforce,项目名称:reactive-grpc,代码行数:18,代码来源:CancellationPropagationIntegrationTest.java


示例4: main

import reactor.core.Disposable; //导入依赖的package包/类
public static void main(String[] args) throws IOException {

		Flux<String> flux = client.get() //
				.uri("/sse/messages") //
				.retrieve() //
				.bodyToFlux(String.class) //
				.doOnNext(System.out::println);

		System.out.println("Subscribing to SSE");
		Disposable subscription = flux.subscribe();

		System.out.println("Press any key to terminate subscription...");
		System.in.read();

		subscription.dispose();
	}
 
开发者ID:mp911de,项目名称:reactive-spring,代码行数:17,代码来源:WorkshopSseClient.java


示例5: coordinatorReachableThroughCacheInnerSubscriptionsOnly

import reactor.core.Disposable; //导入依赖的package包/类
@Test
public void coordinatorReachableThroughCacheInnerSubscriptionsOnly() throws InterruptedException {
	TestPublisher<Integer> source = TestPublisher.create();

	MonoCacheTime<Integer> cached = new MonoCacheTime<>(source.mono(),
			Duration.ofMillis(100), //short cache TTL should trigger state change if source is not never
			Schedulers.parallel());

	Disposable d1 = cached.subscribe();
	cached.subscribe();

	WeakReference<Signal<Integer>> refCoordinator = new WeakReference<>(cached.state);

	assertThat(refCoordinator.get()).isInstanceOf(MonoCacheTime.CoordinatorSubscriber.class);

	Thread.sleep(150);
	source = null;
	cached = null;
	System.gc();

	assertThat(refCoordinator.get()).isInstanceOf(MonoCacheTime.CoordinatorSubscriber.class);
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:23,代码来源:MonoCacheTimeTest.java


示例6: onCancel

import reactor.core.Disposable; //导入依赖的package包/类
@Override
public MonoSink<T> onCancel(Disposable d) {
	Objects.requireNonNull(d, "onCancel");
	SinkDisposable sd = new SinkDisposable(null, d);
	if (!DISPOSABLE.compareAndSet(this, null, sd)) {
		Disposable c = disposable;
		if (c instanceof SinkDisposable) {
			SinkDisposable current = (SinkDisposable) c;
			if (current.onCancel == null) {
				current.onCancel = d;
			}
			else {
				d.dispose();
			}
		}
	}
	return this;
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:19,代码来源:MonoCreate.java


示例7: set

import reactor.core.Disposable; //导入依赖的package包/类
/**
 * Atomically push the field to a {@link Disposable} and dispose the old content.
 *
 * @param updater the target field updater
 * @param holder the target instance holding the field
 * @param newValue the new Disposable to push
 * @return true if successful, false if the field contains the {@link #DISPOSED} instance.
 */
public static <T> boolean set(AtomicReferenceFieldUpdater<T, Disposable> updater, T holder, @Nullable Disposable newValue) {
	for (;;) {
		Disposable current = updater.get(holder);
		if (current == DISPOSED) {
			if (newValue != null) {
				newValue.dispose();
			}
			return false;
		}
		if (updater.compareAndSet(holder, current, newValue)) {
			if (current != null) {
				current.dispose();
			}
			return true;
		}
	}
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:26,代码来源:OperatorDisposables.java


示例8: request

import reactor.core.Disposable; //导入依赖的package包/类
@Override
public void request(long n) {
	if (Operators.validate(n)) {
		if (ONCE.compareAndSet(this, 0, 1)) {
			try {
				Disposable f = scheduler.schedule(this);
				if (!FUTURE.compareAndSet(this,
						null,
						f) && future != FINISHED && future != OperatorDisposables.DISPOSED) {
					f.dispose();
				}
			}
			catch (RejectedExecutionException ree) {
				if (future != FINISHED && future != OperatorDisposables.DISPOSED) {
					actual.onError(Operators.onRejectedExecution(ree,
							this,
							null,
							value, actual.currentContext()));
				}
			}
		}
	}
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:24,代码来源:FluxSubscribeOnValue.java


示例9: coordinatorCacheInnerDisposedOrNoReferenceNoLeak

import reactor.core.Disposable; //导入依赖的package包/类
@Test
public void coordinatorCacheInnerDisposedOrNoReferenceNoLeak() throws InterruptedException {
	TestPublisher<Integer> source = TestPublisher.create();

	MonoCacheTime<Integer> cached = new MonoCacheTime<>(source.mono(),
			Duration.ofMillis(100), //short cache TTL should trigger state change if source is not never
			Schedulers.parallel());

	Disposable d1 = cached.subscribe();
	cached.subscribe();

	WeakReference<Signal<Integer>> refCoordinator = new WeakReference<>(cached.state);

	assertThat(refCoordinator.get()).isInstanceOf(MonoCacheTime.CoordinatorSubscriber.class);

	Thread.sleep(150);
	source = null;
	cached = null;
	d1.dispose();
	System.gc();

	assertThat(refCoordinator.get()).isNull();
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:24,代码来源:MonoCacheTimeTest.java


示例10: subscribersComeAndGoBelowThreshold

import reactor.core.Disposable; //导入依赖的package包/类
@Test
public void subscribersComeAndGoBelowThreshold() {
	Flux<Integer> p = Flux.range(1, 5).publish().refCount(2);

	Disposable r = p.subscribe();
	r.dispose();
	p.subscribe().dispose();
	p.subscribe().dispose();
	p.subscribe().dispose();
	p.subscribe().dispose();

	AssertSubscriber<Integer> ts1 = AssertSubscriber.create();
	p.subscribe(ts1);

	ts1.assertValueCount(0);

	AssertSubscriber<Integer> ts2 = AssertSubscriber.create();
	p.subscribe(ts2);

	ts1.assertValues(1, 2, 3, 4, 5);
	ts2.assertValues(1, 2, 3, 4, 5);
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:23,代码来源:FluxRefCountTest.java


示例11: workerSchedulePeriodically

import reactor.core.Disposable; //导入依赖的package包/类
static Disposable workerSchedulePeriodically(ScheduledExecutorService exec,
		Disposable.Composite tasks,
		Runnable task,
		long initialDelay,
		long period,
		TimeUnit unit) {

	PeriodicWorkerTask sr = new PeriodicWorkerTask(task, tasks);
	if (!tasks.add(sr)) {
		throw Exceptions.failWithRejected();
	}

	try {
		Future<?> f = exec.scheduleAtFixedRate(sr, initialDelay, period, unit);
		sr.setFuture(f);
	}
	catch (RejectedExecutionException ex) {
		sr.dispose();
		//RejectedExecutionException are propagated up
		throw ex;
	}

	return sr;
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:25,代码来源:Schedulers.java


示例12: subscribersComeAndGoBelowThreshold

import reactor.core.Disposable; //导入依赖的package包/类
@Test
public void subscribersComeAndGoBelowThreshold() {
	Flux<Integer> p = Flux.range(1, 5).publish().refCount(2, Duration.ofMillis(500));

	Disposable r = p.subscribe();
	r.dispose();
	p.subscribe().dispose();
	p.subscribe().dispose();
	p.subscribe().dispose();
	p.subscribe().dispose();
	
	AssertSubscriber<Integer> ts1 = AssertSubscriber.create();
	p.subscribe(ts1);

	ts1.assertValueCount(0);

	AssertSubscriber<Integer> ts2 = AssertSubscriber.create();
	p.subscribe(ts2);

	ts1.assertValues(1, 2, 3, 4, 5);
	ts2.assertValues(1, 2, 3, 4, 5);
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:23,代码来源:FluxRefCountGraceTest.java


示例13: schedule

import reactor.core.Disposable; //导入依赖的package包/类
@Override
public Disposable schedule(Runnable task) {
	if(terminated){
		throw Exceptions.failWithRejected();
	}
	Objects.requireNonNull(task, "task");
	ExecutorPlainRunnable r = new ExecutorPlainRunnable(task);
	//RejectedExecutionException are propagated up, but since Executor doesn't from
	//failing tasks we'll also wrap the execute call in a try catch:
	try {
		executor.execute(r);
	}
	catch (Throwable ex) {
		terminated = true;
		Schedulers.handleError(ex);
		throw Exceptions.failWithRejected(ex);
	}
	return r;
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:20,代码来源:ExecutorScheduler.java


示例14: createWorker

import reactor.core.Disposable; //导入依赖的package包/类
@Override public Worker createWorker() {
  return new Worker() {
    private final List<Runnable> workerTasks = new ArrayList<>();

    @Override public Disposable schedule(Runnable task) {
      workerTasks.add(task);
      tasks.add(task);
      return () -> tasks.remove(task);
    }

    @Override public void dispose() {
      tasks.removeAll(workerTasks);
    }
  };
}
 
开发者ID:JakeWharton,项目名称:retrofit2-reactor-adapter,代码行数:16,代码来源:TestScheduler.java


示例15: cancelReceiver

import reactor.core.Disposable; //导入依赖的package包/类
final boolean cancelReceiver() {
	Disposable c = receiverCancel;
	if (c != CANCELLED) {
		c = CANCEL.getAndSet(this, CANCELLED);
		if (c != CANCELLED) {
			c.dispose();
			return true;
		}
	}
	return false;
}
 
开发者ID:reactor,项目名称:reactor-netty,代码行数:12,代码来源:FluxReceive.java


示例16: schedule

import reactor.core.Disposable; //导入依赖的package包/类
@Override
public Disposable schedule(Runnable task, long delay, TimeUnit unit) {
	if (shutdown) {
		throw Exceptions.failWithRejected();
	}
	return directWorker.schedule(task, delay, unit);
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:8,代码来源:VirtualTimeScheduler.java


示例17: subscriptionCancelNullifiesActual

import reactor.core.Disposable; //导入依赖的package包/类
@Test
public void subscriptionCancelNullifiesActual() {
	UnicastProcessor<String> processor = UnicastProcessor.create();

	assertThat(processor.downstreamCount())
			.as("before subscribe")
			.isZero();

	LambdaSubscriber<String> subscriber = new LambdaSubscriber<>(null, null, null, null);
	Disposable subscription = processor.subscribeWith(subscriber);

	assertThat(processor.downstreamCount())
			.as("after subscribe")
			.isEqualTo(1);
	assertThat(processor.actual())
			.as("after subscribe")
			.isSameAs(subscriber);

	subscription.dispose();

	assertThat(processor.downstreamCount())
			.as("after subscription cancel")
			.isZero();
	assertThat(processor.actual())
			.as("after subscription cancel")
			.isNull();
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:28,代码来源:UnicastProcessorTest.java


示例18: replace

import reactor.core.Disposable; //导入依赖的package包/类
static boolean replace(AtomicReference<Disposable> ref, @Nullable Disposable c) {
	for (; ; ) {
		Disposable current = ref.get();
		if (current == CANCELLED) {
			if (c != null) {
				c.dispose();
			}
			return false;
		}
		if (ref.compareAndSet(current, c)) {
			return true;
		}
	}
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:15,代码来源:VirtualTimeScheduler.java


示例19: wasCancelledMono

import reactor.core.Disposable; //导入依赖的package包/类
@Test
public void wasCancelledMono() {
	PublisherProbe<Void> probe = PublisherProbe.of(Mono.never());
	Disposable d = probe.mono().subscribe();

	assertThat(probe.wasCancelled()).isFalse();

	d.dispose();

	assertThat(probe.wasCancelled()).isTrue();
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:12,代码来源:PublisherProbeTest.java


示例20: schedule

import reactor.core.Disposable; //导入依赖的package包/类
@Override
public Disposable schedule(Runnable task) {
	CachedService cached = pick();

	return Schedulers.directSchedule(cached.exec,
			new DirectScheduleTask(task, cached),
			0L,
			TimeUnit.MILLISECONDS);
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:10,代码来源:ElasticScheduler.java



注:本文中的reactor.core.Disposable类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java IRecordProcessor类代码示例发布时间:2022-05-21
下一篇:
Java IResourceProxy类代码示例发布时间:2022-05-21
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap