Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
115 views
in Technique[技术] by (71.8m points)

java - Enforce partition be stored on the specific executor

I have 5-partitions-RDD and 5 workers/executors. How can I ask Spark to save each RDD's partition on the different worker (IP)?

Am I right if I say Spark can save few partitions on one worker, and 0 partitions on other workers? Means, I can specify the number of partitions, but Spark still can cache everything on a single node.

Replication is not an option since RDD is huge.

Workarounds I have found

getPreferredLocations

RDD's getPreferredLocations method does not provide a 100% warranty that partition will be stored on a specified node. Spark will try during spark.locality.wait, but afterward, Spark will cache partition on a different node.

As a workarround, you can set very high value to spark.locality.wait and override getPreferredLocations. The bad news - you can not do that with Java, you need to write Scala code. At least Scala internals wrapped with Java code. I.e:

class NodeAffinityRDD[U: ClassTag](prev: RDD[U]) extends RDD[U](prev) {

  val nodeIPs = Array("192.168.2.140","192.168.2.157","192.168.2.77")

  override def getPreferredLocations(split: Partition): Seq[String] =
    Seq(nodeIPs(split.index % nodeIPs.length))
}

SparkContext's makeRDD

SparkContext has makeRDD method. This method lack documentation. As I understand, I can specify preferred locations, and then set a high value to spark.locality.wait. The bad news - preferred location will be discarded on the first shuffle/join/cogroup operation.


Both approaches have the drawback of too high spark.locality.wait can cause your cluster to starve if some of the nodes will be unavailable.

P.S. More context

I have up to 10,000 of sales-XXX.parquet files, each represents sales of different goods in the different regions. Each sales-XXX.parquet could vary from a few KBs to a few GBs. All sales-XXX.parquets together could take up to tens or hundreds of GBs at HDFS. I need a full-text search through all sales. I have to index each sales-XXX.parquet one-by-one with Lucene. And now I have two options:

  1. Keep Lucene indexes in Spark. There is already solution for this, but it looks pretty suspicious. Is there any better solutions?
  2. Keep Lucene indexes at the local file system. Then I can map-reduce on the results of each worker's index lookup. But this approach requires each worker node keeps an equal amount of data. How could I ensure Spark will keep equal amount of data on each worker node?
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...