saveAsTextFile
does not support append. If called with a fixed filename, it will overwrite it every time.
We could do saveAsTextFile(path+timestamp)
to save to a new file every time. That's the basic functionality of DStream.saveAsTextFiles(path)
An easily accessible format that supports append
is Parquet. We first transform our data RDD to a DataFrame
or Dataset
and then we can benefit from the write support offered on top of that abstraction.
case class DataStructure(field1,..., fieldn)
... streaming setup, dstream declaration, ...
val structuredOutput = outputDStream.map(record => mapFunctionRecordToDataStructure)
structuredOutput.foreachRDD(rdd =>
import sparkSession.implicits._
val df = rdd.toDF()
df.write.format("parquet").mode("append").save(s"$workDir/$targetFile")
})
Note that appending to Parquet files gets more expensive over time, so rotating the target file from time to time is still a requirement.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…