There are few important differences but the fundamental one is what happens with lineage. Persist
/ cache
keeps lineage intact while checkpoint
breaks lineage. Lets consider following examples:
import org.apache.spark.storage.StorageLevel
val rdd = sc.parallelize(1 to 10).map(x => (x % 3, 1)).reduceByKey(_ + _)
cache
/ persist
:
val indCache = rdd.mapValues(_ > 4)
indCache.persist(StorageLevel.DISK_ONLY)
indCache.toDebugString
// (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated]
// | ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated]
// +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated]
// | ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
indCache.count
// 3
indCache.toDebugString
// (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated]
// | CachedPartitions: 8; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 587.0 B
// | ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated]
// +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated]
// | ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
checkpoint
:
val indChk = rdd.mapValues(_ > 4)
indChk.checkpoint
indChk.toDebugString
// (8) MapPartitionsRDD[11] at mapValues at <console>:24 []
// | ShuffledRDD[3] at reduceByKey at <console>:21 []
// +-(8) MapPartitionsRDD[2] at map at <console>:21 []
// | ParallelCollectionRDD[1] at parallelize at <console>:21 []
indChk.count
// 3
indChk.toDebugString
// (8) MapPartitionsRDD[11] at mapValues at <console>:24 []
// | ReliableCheckpointRDD[12] at count at <console>:27 []
As you can see, in the first case lineage is preserved even if data is fetched from the cache. It means that data can be recomputed from scratch if some partitions of indCache
are lost. In the second case lineage is completely lost after the checkpoint and indChk
doesn't carry an information required to rebuild it anymore.
checkpoint
, unlike cache
/ persist
is computed separately from other jobs. That's why RDD marked for checkpointing should be cached:
It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.
Finally checkpointed
data is persistent and not removed after SparkContext
is destroyed.
Regarding data storage SparkContext.setCheckpointDir
used by RDD.checkpoint
requires DFS
path if running in non-local mode. Otherwise it can be local files system as well. localCheckpoint
and persist
without replication should use local file system.
Important Note:
RDD checkpointing is a different concept than a chekpointing in Spark Streaming. The former one is designed to address lineage issue, the latter one is all about streaming reliability and failure recovery.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…