I have 5-partitions-RDD and 5 workers/executors. How can I ask Spark to save each RDD's partition on the different worker (IP)?
Am I right if I say Spark can save few partitions on one worker, and 0 partitions on other workers? Means, I can specify the number of partitions, but Spark still can cache everything on a single node.
Replication is not an option since RDD is huge.
Workarounds I have found
getPreferredLocations
RDD's getPreferredLocations
method does not provide a 100% warranty that partition will be stored on a specified node. Spark will try during spark.locality.wait
, but afterward, Spark will cache partition on a different node.
As a workarround, you can set very high value to spark.locality.wait
and override getPreferredLocations
. The bad news - you can not do that with Java, you need to write Scala code. At least Scala internals wrapped with Java code. I.e:
class NodeAffinityRDD[U: ClassTag](prev: RDD[U]) extends RDD[U](prev) {
val nodeIPs = Array("192.168.2.140","192.168.2.157","192.168.2.77")
override def getPreferredLocations(split: Partition): Seq[String] =
Seq(nodeIPs(split.index % nodeIPs.length))
}
SparkContext's makeRDD
SparkContext has makeRDD method. This method lack documentation. As I understand, I can specify preferred locations, and then set a high value to spark.locality.wait
. The bad news - preferred location will be discarded on the first shuffle/join/cogroup operation.
Both approaches have the drawback of too high spark.locality.wait
can cause your cluster to starve if some of the nodes will be unavailable.
P.S. More context
I have up to 10,000 of sales-XXX.parquet
files, each represents sales of different goods in the different regions. Each sales-XXX.parquet
could vary from a few KBs to a few GBs. All sales-XXX.parquet
s together could take up to tens or hundreds of GBs at HDFS.
I need a full-text search through all sales. I have to index each sales-XXX.parquet
one-by-one with Lucene. And now I have two options:
- Keep Lucene indexes in Spark. There is already solution for this, but it looks pretty suspicious. Is there any better solutions?
- Keep Lucene indexes at the local file system. Then I can map-reduce on the results of each worker's index lookup. But this approach requires each worker node keeps an equal amount of data. How could I ensure Spark will keep equal amount of data on each worker node?
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…