Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
327 views
in Technique[技术] by (71.8m points)

spring boot - Start processing Flux response from server before completion: is it possible?

I have 2 Spring-Boot-Reactive apps, one server and one client; the client calls the server like so:

Flux<Thing> things = thingsApi.listThings(5);

And I want to have this as a list for later use:

// "extractContent" operation takes 1.5s per "thing"
List<String> thingsContent = things.map(ThingConverter::extractContent)
        .collect(Collectors.toList())
        .block()

On the server side, the endpoint definition looks like this:

@Override
public Mono<ResponseEntity<Flux<Thing>>> listThings(
        @NotNull @Valid @RequestParam(value = "nbThings") Integer nbThings,
        ServerWebExchange exchange
) {
    // "getThings" operation takes 1.5s per "thing"
    Flux<Thing> things = thingsService.getThings(nbThings);
    return Mono.just(new ResponseEntity<>(things, HttpStatus.OK));
}

The signature comes from the Open-API generated code (Spring-Boot server, reactive mode).


What I observe: the client jumps to things.map immediately but only starts processing the Flux after the server has finished sending all the "things".

What I would like: the server should send the "things" as they are generated so that the client can start processing them as they arrive, effectively halving the processing time.

Is there a way to achieve this? I've found many tutorials online for the server part, but none with a java client. I've heard of server-sent events, but can my goal be achieved using a "classic" Open-API endpoint definition that returns a Flux?

The problem seemed too complex to fit a minimal viable example in the question body; full code available for reference on Github.

EDIT: redirect link to main branch after merge of the proposed solution

question from:https://stackoverflow.com/questions/65847037/start-processing-flux-response-from-server-before-completion-is-it-possible

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

I've got it running by changing 2 points:

  • First: I've changed the content type of the response of your /things endpoint, to:
content:
  text/event-stream

Don't forget to change also the default response, else the client will expect the type application/json and will wait for the whole response.

  • Second point: I've changed the return of ThingsService.getThings to this.getThingsFromExistingStream (the method you comment out)

I pushed my changes to a new branch fix-flux-response on your Github, so you can test them directly.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...