In the below code snipped when dispose()
is called, then the emitter thread is interrupted (InterruptedException
is thrown out of sleep method).
Observable<Integer> obs = Observable.create(emitter -> {
for (int i = 0; i < 10; i++) {
if (emitter.isDisposed()) {
System.out.println("> exiting.");
emitter.onComplete();
return;
}
emitter.onNext(i);
System.out.println("> calculation = " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
emitter.onComplete();
});
Disposable disposable = obs
.subscribeOn(Schedulers.computation())
.subscribe(System.out::println);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
disposable.dispose();
From debugging session I see that the interrupt origins from FutureTask
which is cancelled during disposal. In there, the thread that is calling dispose()
is checked against runner thread, and if it does not match, the emitter is interrupted. The thread is different since I used computation Scheduler
.
Is there any way to make dispose not interrupt such emitter or is it how this actually should always be handled? An issue I see with this approach is when I would have an interruptible operation (simulated here by sleep) that I would want to complete normally before calling onComplete()
.
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…