Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
780 views
in Technique[技术] by (71.8m points)

apache spark - Number of dataframe partitions after sorting?

How does spark determine the number of partitions after using an orderBy? I always thought that the resulting dataframe has spark.sql.shuffle.partitions, but this does not seem to be true :

val df = (1 to 10000).map(i => ("a",i)).toDF("n","i").repartition(10).cache

df.orderBy($"i").rdd.getNumPartitions // = 200 (=spark.sql.shuffle.partitions)
df.orderBy($"n").rdd.getNumPartitions // = 2 

In both cases, spark does +- Exchange rangepartitioning(i/n ASC NULLS FIRST, 200), so how can the resulting number of partitions in the second case be 2?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

spark.sql.shuffle.partitions is used as an upper bound. The final number of partitions is 1 <= partitions <= spark.sql.shuffle.partition.


As you've mentioned, the sorting in Spark goes through RangePartitioner. What it tries to achieve is to partition your dataset into a specified number (spark.sql.shuffle.partition) of roughly equal ranges.

There's a guarantee that equal values will be in the same partition after the partitioning. It's worth checking RangePartitioning (not part of the public API) class documentation:

...

All row where the expressions in ordering evaluate to the same values will be in the same partition

And if the number of distinct ordering values is less than the desired number of partitions, i.e. the number of possible ranges is less than spark.sql.shuffle.partition, you'll end up with a smaller number of partitions. Also, here's a quote from RangePartitioner Scaladoc:

The actual number of partitions created by the RangePartitioner might not be the same as the partitions parameter, in the case where the number of sampled records is less than the value of partitions.

Going back to your example, n is a constant ("a") and could not be partitioned. On the other hand, i can have 10,000 possible values and is partitioned into 200 (=spark.sql.shuffle.partition) ranges or partitions.

Note that this is only true for DataFrame/Dataset API. When using RDD's sortByKey one can either specify the number of partitions explicitly or Spark will use the current number of partitions.

See also:


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...