Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
364 views
in Technique[技术] by (71.8m points)

java - Apache Camel reactive streams with Bindy - only reading first line

I'm trying to use Apache Camel (version 2.25.3) reactive streams in combination with Spring Boot to read a large csv file and unmarshal the lines using Bindy. This is "working" in the sense that the application runs and detects files as they appear but I then only see the first line of the file in my stream. It appears to be Bindy related because if I take the unmarshalling out of the equation I get back all the lines of the csv file in my stream just fine. I have simplified the problem to demonstrate here on SO. I'm using Spring Webflux to expose the resulting Publisher.

So my Camel route is as follows:

import lombok.RequiredArgsConstructor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.dataformat.bindy.csv.BindyCsvDataFormat;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;

@RequiredArgsConstructor
@Component
public class TransactionLineCsvRoute extends RouteBuilder {
    private final CamelReactiveStreamsService camelRs;

    @Override
    public void configure() {
        var bindy = new BindyCsvDataFormat(LineItem.class);

        from("file:input/?include=.*\.csv&move=successImport&moveFailed=failImport")
                .unmarshal(bindy)
                .to("reactive-streams:lineItems");
    }

    public Flux<LineItem> getLineItemFlux() {
        Publisher<LineItem> lineItems = camelRs.fromStream("lineItems", LineItem.class);

        return Flux.from(lineItems);
    }
}

The Bindy class:

@ToString
@Getter
@CsvRecord(separator = ";", skipFirstLine = true, skipField =true)
public class LineItem {
    @DataField(pos = 2)
    private String description;
}

And the endpoint to expose the Flux:

@GetMapping(value = "/lineItems", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<LineItem> lineItems() {
    return lineItemFlux;
}

So when I now do my curl:

curl localhost:8080/lineItems

I only get back the first line whereas when I remove the ".unmarshal(bind)" line (and refactor the stream to be of type String instead of LineItem) I get back all the elements of the csv file.

So I guess I'm not using Bindy correct within the reactive streams context. I followed this Camel documentation and tried to rewrite my route as follows:

from("file:input/?include=.*\.csv&move=successImport&moveFailed=failImport")
        .to("reactive-streams:rawLines");

from("reactive-streams:rawLines")
        .unmarshal(bindy)
        .to("reactive-streams:lineItems");

It shows the routes are started correctly:

2021-01-04 10:13:26.798  INFO 26438 --- [           main] o.a.camel.spring.SpringCamelContext      : Route: route1 started and consuming from: file://input/?include=.*%5C.csv&move=successImport&moveFailed=failImport
2021-01-04 10:13:26.800  INFO 26438 --- [           main] o.a.camel.spring.SpringCamelContext      : Route: route2 started and consuming from: reactive-streams://rawLines
2021-01-04 10:13:26.801  INFO 26438 --- [           main] o.a.camel.spring.SpringCamelContext      : Total 2 routes, of which 2 are started

But then I get an exception stating "The stream has no active subscriptions":

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[route1            ] [route1            ] [file://input/?include=.*%5C.csv&move=successImport&moveFailed=failImport      ] [         9]
[route1            ] [to1               ] [reactive-streams:rawLines                                                     ] [         5]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------

java.lang.IllegalStateException: The stream has no active subscriptions
    at org.apache.camel.component.reactive.streams.engine.CamelPublisher.publish(CamelPublisher.java:108) ~[camel-reactive-streams-2.25.3.jar:2.25.3]
    at org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService.sendCamelExchange(DefaultCamelReactiveStreamsService.java:144) ~[camel-reactive-streams-2.25.3.jar:2.25.3]
    at org.apache.camel.component.reactive.streams.ReactiveStreamsProducer.process(ReactiveStreamsProducer.java:52) ~[camel-reactive-streams-2.25.3.jar:2.25.3]

Does anyone have any pointers how I might use Bindy in combination with reactive streams? Thanks!

EDIT

After the very helpful post from burki I was able to fix my code. So the route definition changed to the following. As you can see I removed the unmarshal step, so it just picks up files from the file system as they arrive and puts them in a reactive stream:

@Override
public void configure() {
    from("file:input/?include=.*\.csv&move=successImport&moveFailed=failImport")
            .to("reactive-streams:extractedFile");
}

And then expose the file stream as a Flux:

public Flux<File> getFileFlux() {
    return Flux.from(camelRs.fromStream("extractedFile", File.class));
}

And the code to parse the CSV is as follows (using OpenCSV as burki suggested but using a different part of the API):

private Flux<LineItem> readLineItems() {
    return fileFlux
            .flatMap(message -> Flux.using(
                    () -> new CsvToBeanBuilder<LineItem>(createFileReader(message)).withSkipLines(1)
                            .withSeparator(';')
                            .withType(LineItem.class)
                            .build()
                            .stream(),
                    Flux::fromStream,
                    BaseStream::close)
            );
}

private FileReader createFileReader(File file) {
    System.out.println("Reading file from: " + file.getAbsolutePath());
    try {
        return new FileReader(file);
    } catch (FileNotFoundException e) {
        throw new RuntimeException(e);
    }
}

You can now expose this resulting Flux as an endpoint:

@GetMapping(value = "/lineItems", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<LineItem> lineItems() {
    return readLineItems();
}

And now when you do the curl like I did above you get the full unmarshalled LineItems from the csv.

I still have as a todo whether or not this actually loads the entire file into memory or not. I don't think so, I think I only get a pointer to the file which I then stream to the OpenCSV bean but I need to verify this, could be that I'm now first reading the entire file into memory and then streaming it which would defeat the purpose.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

I guess that the file consumer just pass the whole file to the unmarshalling step.

Therefore if you unmarshal the result of the file consumer to a LineItem, you "reduce" the whole file content to the first line.

If you in contrast remove the unmarshalling you get the whole file content. But probably the file consumer loaded the whole file into memory before passing it on.

But reading the whole file is not what you want. To read the CSV file line by line you need to split the file in streaming mode.

from("file:...")
    .split(body().tokenize(LINE_FEED)).streaming()
    .to("direct:processLine") 

Like this, the Splitter sends every line to the route direct:processLine for further processing.

The problem I faced in this scenario was to parse a single CSV line. Most of the CSV libraries are designed to read and parse entire files, not single lines.

However the quite old OpenCSV lib has a CSVParser with a parseLine(String csvLine) method. So I used this to parse a "completely detached" single CSV line.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...