Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
139 views
in Technique[技术] by (71.8m points)

java - Terminate subscription on external event

In an application, I use long polling to an external HTTP endpoint. I do this using Spring's reactive WebClient. In order to cleanly shutdown when the application stops (and avoid ugly Netty stack traces), I use takeUntil() with an instance of an EmitterProcessor that I call onNext() on when Spring stops my bean (I implement SmartLifecycle).

The whole thing works something like this:

@Component
@RequiredArgsConstructor
@Slf4j
public class LongPollingMessageReceiver implements SmartLifecycle {
    private boolean running = true;

    private final EmitterProcessor<Boolean> shutdown = EmitterProcessor.create();

    private final BackendMessageReceiver backendMessageReceiver;

    public void waitForMessages() {
        Mono.defer(() -> backendMessageReceiver.receiveMessages()) // Calls WebClient
            .repeat()
            .takeUntilOther(shutdown)
            .subscribe(event -> {
                // do something when the http endpoint answers
            });
    }

    @Override
    public int getPhase() {
        // We need to cancel the subscriptions before Reactor/Netty shuts down.
        // Using @PreDestroy does not work because it is called *after* the Reactor/Netty shutdown.
        return 0;
    }

    @Override
    public void start() {
        // Not needed
    }

    @Override
    public void stop() {
        log.info("Stopping message subscriptions");
        shutdown.onNext(true);
        shutdown.onComplete();
        running = false;
    }

    @Override
    public boolean isRunning() {
        return running;
    }
}

The whole mechanism works fine for now. However, EmitterProcessor is marked as @Deprecated and the javadoc says to use a Sink instead. Sink does not implement the Publisher interface and thus can not be passed into takeUntilOther().

What am I supposed to do to solve this problem without being stuck on Project Reactor < 3.5 forever?

question from:https://stackoverflow.com/questions/65920864/terminate-subscription-on-external-event

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Sinks are intended as the developer-facing API for programmatically triggering reactive events. This wouldn't be very useful if there wasn't a way to present these as a typical Flux or Mono to the rest of the application though.

Sinks.Many has an asFlux() view to that effect. Similarly, Sinks.One and Sinks.Empty have an asMono() view.

That's what you can use to pass to takeUntilOther.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...