Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
450 views
in Technique[技术] by (71.8m points)

partitioning - In Apache Spark, why does RDD.union not preserve the partitioner?

As everyone knows partitioners in Spark have a huge performance impact on any "wide" operations, so it's usually customized in operations. I was experimenting with the following code:

val rdd1 =
  sc.parallelize(1 to 50).keyBy(_ % 10)
    .partitionBy(new HashPartitioner(10))
val rdd2 =
  sc.parallelize(200 to 230).keyBy(_ % 13)

val cogrouped = rdd1.cogroup(rdd2)
println("cogrouped: " + cogrouped.partitioner)

val unioned = rdd1.union(rdd2)
println("union: " + unioned.partitioner)

I see that by default cogroup() always yields an RDD with the customized partitioner, but union() doesn't, it will always revert back to default. This is counterintuitive as we usually assume that a PairRDD should use its first element as partition key. Is there a way to "force" Spark to merge 2 PairRDDs to use the same partition key?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

union is a very efficient operation, because it doesn't move any data around. If rdd1 has 10 partitions and rdd2 has 20 partitions then rdd1.union(rdd2) will have 30 partitions: the partitions of the two RDDs put after each other. This is just a bookkeeping change, there is no shuffle.

But necessarily it discards the partitioner. A partitioner is constructed for a given number of partitions. The resulting RDD has a number of partitions that is different from both rdd1 and rdd2.

After taking the union you can run repartition to shuffle the data and organize it by key.


There is one exception to the above. If rdd1 and rdd2 have the same partitioner (with the same number of partitions), union behaves differently. It will join the partitions of the two RDDs pairwise, giving it the same number of partitions as each of the inputs had. This may involve moving data around (if the partitions were not co-located) but will not involve a shuffle. In this case the partitioner is retained. (The code for this is in PartitionerAwareUnionRDD.scala.)


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...