In an application, I use long polling to an external HTTP endpoint. I do this using Spring's reactive WebClient
. In order to cleanly shutdown when the application stops (and avoid ugly Netty stack traces), I use takeUntil()
with an instance of an EmitterProcessor
that I call onNext()
on when Spring stops my bean (I implement SmartLifecycle
).
The whole thing works something like this:
@Component
@RequiredArgsConstructor
@Slf4j
public class LongPollingMessageReceiver implements SmartLifecycle {
private boolean running = true;
private final EmitterProcessor<Boolean> shutdown = EmitterProcessor.create();
private final BackendMessageReceiver backendMessageReceiver;
public void waitForMessages() {
Mono.defer(() -> backendMessageReceiver.receiveMessages()) // Calls WebClient
.repeat()
.takeUntilOther(shutdown)
.subscribe(event -> {
// do something when the http endpoint answers
});
}
@Override
public int getPhase() {
// We need to cancel the subscriptions before Reactor/Netty shuts down.
// Using @PreDestroy does not work because it is called *after* the Reactor/Netty shutdown.
return 0;
}
@Override
public void start() {
// Not needed
}
@Override
public void stop() {
log.info("Stopping message subscriptions");
shutdown.onNext(true);
shutdown.onComplete();
running = false;
}
@Override
public boolean isRunning() {
return running;
}
}
The whole mechanism works fine for now. However, EmitterProcessor
is marked as @Deprecated
and the javadoc says to use a Sink
instead. Sink
does not implement the Publisher
interface and thus can not be passed into takeUntilOther()
.
What am I supposed to do to solve this problem without being stuck on Project Reactor < 3.5 forever?
question from:
https://stackoverflow.com/questions/65920864/terminate-subscription-on-external-event 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…