I am exploring reactive programming and RxJava. It is fun, but I am stuck on a problem for which I cannot find an answer. My basic question: what is a reactive-appropriate way to terminate an otherwise infinitely-running Observable? I also welcome critiques and reactive best practices regarding my code.
As an exercise, I am writing a log file tail utility. The stream of lines in the log file is represented by an Observable<String>
. To get the BufferedReader
to continue reading text that is added to the file, I ignore the usual reader.readLine() == null
termination check and instead interpret it to mean that my thread should sleep and wait for more logger text.
But while I can terminate the Observer using takeUntil
, I need to find a clean way to terminate the otherwise infinitely-running file watcher. I can write my own terminateWatcher
method/field, but that breaks the Observable/Observer encapsulation -- and I'd like to stay as strict to the reactive paradigm as possible.
Here is the Observable<String>
code:
public class FileWatcher implements OnSubscribeFunc<String> {
private Path path = . . .;
@Override
// The <? super String> generic is pointless but required by the compiler
public Subscription onSubscribe(Observer<? super String> observer) {
try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
String newLine = "";
while (!Thread.interrupted()) { // How do I terminate this reactively?
if ((newLine = reader.readLine()) != null)
observer.onNext(newLine);
else
try {
// Wait for more text
Thread.sleep(250);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}
return null; // Not sure what Subscription I should return
}
}
Here is the Observer code that prints the new lines as they come:
public static void main(String... args) {
. . .
Observable<String> lines = Observable.create(createWatcher(file));
lines = lines.takeWhile(new Func1<String, Boolean>() {
@Override
public Boolean call(String line) {
// Predicate for which to continue processing
return !line.contains("shutdown");
}
}).subscribeOn(Schedulers.threadPoolForIO())
.observeOn(Schedulers.currentThread());
// Seems like I should use subscribeOn() and observeOn(), but they
// make my tailer terminate without reading any text.
Subscription subscription = lines.subscribe(new Action1<String>() {
@Override
public void call(String line) {
System.out.printf("%20s%s
", file, line);
}
});
}
My two questions are:
- What is a reactive-consistent way to terminate an otherwise infinitely-running stream?
- What other mistakes in my code make you cry? :)
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…