Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
139 views
in Technique[技术] by (71.8m points)

java - Closing streams in the middle of pipelines

When I execute this code which opens a lot of files during a stream pipeline:

public static void main(String[] args) throws IOException {
    Files.find(Paths.get("JAVA_DOCS_DIR/docs/api/"),
            100, (path, attr) -> path.toString().endsWith(".html"))
        .map(file -> runtimizeException(() -> Files.lines(file, StandardCharsets.ISO_8859_1)))
        .map(Stream::count)
        .forEachOrdered(System.out::println);
}

I get an exception:

java.nio.file.FileSystemException: /long/file/name: Too many open files

The problem is that Stream.count does not close the stream when it is done traversing it. But I don't see why it shouldn't, given that it is a terminal operation. The same holds for other terminal operations such as reduce and forEach. flatMap on the other hand closes the streams it consists of.

The documentation tells me to use a try-with-resouces-statement to close streams if necessary. In my case I could replace the count line with something like this:

.map(s -> { long c = s.count(); s.close(); return c; } )

But that is noisy and ugly and could be a real inconvenience in some cases with big, complex pipelines.

So my questions are the following:

  1. Why were the streams not designed so that terminal operations close the streams they are working on? That would make them work better with IO streams.
  2. What is the best solution for closing IO streams in pipelines?

runtimizeException is a method that wraps checked exception in RuntimeExceptions.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

There are two issues here: handling of checked exceptions such as IOException, and timely closing of resources.

None of the predefined functional interfaces declare any checked exceptions, which means that they have to be handled within the lambda, or wrapped in an unchecked exception and rethrown. It looks like your runtimizeException function does that. You probably also had to declare your own functional interface for it. As you've probably discovered, this is a pain.

On the closing of resources like files, there was some investigation of having streams be closed automatically when the end of the stream was reached. This would be convenient, but it doesn't deal with closing when an exception is thrown. There's no magic do-the-right-thing mechanism for this in streams.

We're left with the standard Java techniques of dealing with resource closure, namely the try-with-resources construct introduced in Java 7. TWR really wants to have resources be closed at the same level in the call stack as they were opened. The principle of "whoever opens it has to close it" applies. TWR also deals with exception handling, which usually makes it convenient to deal with exception handling and resource closing in the same place.

In this example, the stream is somewhat unusual in that it maps a Stream<Path> to a Stream<Stream<String>>. These nested streams are the ones that aren't closed, resulting in the eventual exception when the system runs out of open file descriptors. What makes this difficult is that files are opened by one stream operation and then passed downstream; this makes it impossible to use TWR.

An alternative approach to structuring this pipeline is as follows.

The Files.lines call is the one that opens the file, so this has to be the resource in the TWR statement. The processing of this file is where (some) IOExceptions get thrown, so we can do the exception wrapping in the same TWR statement. This suggests having a simple function that maps the path to a line count, while handling resource closing and exception wrapping:

long lineCount(Path path) {
    try (Stream<String> s = Files.lines(path, StandardCharsets.ISO_8859_1)) {
        return s.count();
    } catch (IOException ioe) {
        throw new UncheckedIOException(ioe);
    }
}

Once you have this helper function, the main pipeline looks like this:

Files.find(Paths.get("JAVA_DOCS_DIR/docs/api/"),
           100, (path, attr) -> path.toString().endsWith(".html"))
     .mapToLong(this::lineCount)
     .forEachOrdered(System.out::println);

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...