In general you can use checkpoints to break long lineages. Some more or less similar to this should work:
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
val checkpointInterval: Int = ???
def loadAndFilter(path: String) = sc.textFile(path)
.filter(_.startsWith("#####"))
.map((path, _))
def mergeWithLocalCheckpoint[T: ClassTag](interval: Int)
(acc: RDD[T], xi: (RDD[T], Int)) = {
if(xi._2 % interval == 0 & xi._2 > 0) xi._1.union(acc).localCheckpoint
else xi._1.union(acc)
}
val zero: RDD[(String, String)] = sc.emptyRDD[(String, String)]
fileList.map(loadAndFilter).zipWithIndex
.foldLeft(zero)(mergeWithLocalCheckpoint(checkpointInterval))
In this particular situation a much simpler solution should be to use SparkContext.union
method:
val masterRDD = sc.union(
fileList.map(path => sc.textFile(path)
.filter(_.startsWith("#####"))
.map((path, _)))
)
A difference between these methods should be obvious when you take a look at the DAG generated by loop / reduce
:
and a single union
:
Of course if files are small you can combine wholeTextFiles
with flatMap
and read all files at once:
sc.wholeTextFiles(fileList.mkString(","))
.flatMap{case (path, text) =>
text.split("
").filter(_.startsWith("#####")).map((path, _))}
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…