Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
3.6k views
in Technique[技术] by (71.8m points)

pyspark - How to divide a column by its sum in a Spark DataFrame

How can I divide a column by its own sum in a Spark DataFrame, efficiently and without immediately triggering a computation?

Suppose we have some data:

import pyspark
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as spf

spark = SparkSession.builder.master('local').getOrCreate()

data = spark.range(0, 100)

data # --> DataFrame[id: bigint]

I’d like to create a new column on this data frame called “normalized” that contains id / sum(id). One way to do it is to pre-compute the sum, like this:

s = data.select(spf.sum('id')).collect()[0][0]
data2 = data.withColumn('normalized', spf.col('id') / s)
data2 # --> DataFrame[id: bigint, normalized: double]

That works fine, but it immediately triggers a computation; if you're defining something similar for many columns it will cause multiple redundant passes over the data.

Another way to do it is with a windowing specification that includes the whole table:

w = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
data3 = data.withColumn('normalized', spf.col('id') / spf.sum('id').over(w))
data3 # --> DataFrame[id: bigint, normalized: double]

In this case, it's fine to define data3, but once you try to actually compute it, Spark 2.2.0 will move all the data into a single partition, which typically causes the job to fail for large data sets.

What other approaches are there to solving this problem, that don't trigger an immediate computation and that will work with large data sets? I'm interested in any solutions, not necessarily solutions based on pyspark.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

crossJoin with aggregate is one approach:

data.crossJoin( 
    data.select(spf.sum('id').alias("sum_id"))
).withColumn("normalized", spf.col("id") / spf.col("sum_id"))

but I wouldn't worry to much:

That works fine, but it immediately triggers a computation; if you're defining something similar for many columns it will cause multiple redundant passes over the data.

Just compute multiple statistics at once:

data2 = data.select(spf.rand(42).alias("x"), spf.randn(42).alias("y"))
mean_x, mean_y = data2.groupBy().mean().first()

and the rest is just an operation on local expressions:

data2.select(spf.col("x") - mean_x, spf.col("y") - mean_y)

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...