Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.1k views
in Technique[技术] by (71.8m points)

apache spark - Pyspark : forward fill with last observation for a DataFrame

Using Spark 1.5.1,

I've been trying to forward fill null values with the last known observation for one column of my DataFrame.

It is possible to start with a null value and for this case I would to backward fill this null value with the first knwn observation. However, If that too complicates the code, this point can be skipped.

In this post, a solution in Scala was provided for a very similar problem by zero323.

But, I don't know Scala and I don't succeed to ''translate'' it in Pyspark API code. It's possible to do it with Pyspark ?

Thanks for your help.

Below, a simple example sample input:

| cookie_ID     | Time       | User_ID   
| ------------- | --------   |------------- 
| 1             | 2015-12-01 | null 
| 1             | 2015-12-02 | U1
| 1             | 2015-12-03 | U1
| 1             | 2015-12-04 | null   
| 1             | 2015-12-05 | null     
| 1             | 2015-12-06 | U2
| 1             | 2015-12-07 | null
| 1             | 2015-12-08 | U1
| 1             | 2015-12-09 | null      
| 2             | 2015-12-03 | null     
| 2             | 2015-12-04 | U3
| 2             | 2015-12-05 | null   
| 2             | 2015-12-06 | U4

And the expected output:

| cookie_ID     | Time       | User_ID   
| ------------- | --------   |------------- 
| 1             | 2015-12-01 | U1
| 1             | 2015-12-02 | U1
| 1             | 2015-12-03 | U1
| 1             | 2015-12-04 | U1
| 1             | 2015-12-05 | U1
| 1             | 2015-12-06 | U2
| 1             | 2015-12-07 | U2
| 1             | 2015-12-08 | U1
| 1             | 2015-12-09 | U1
| 2             | 2015-12-03 | U3
| 2             | 2015-12-04 | U3
| 2             | 2015-12-05 | U3
| 2             | 2015-12-06 | U4
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

The partitioned example code from Spark / Scala: forward fill with last observation in pyspark is shown. This only works for data that can be partitioned.

Load the data

values = [
    (1, "2015-12-01", None),
    (1, "2015-12-02", "U1"),
    (1, "2015-12-02", "U1"),
    (1, "2015-12-03", "U2"),
    (1, "2015-12-04", None),
    (1, "2015-12-05", None),
    (2, "2015-12-04", None),
    (2, "2015-12-03", None),
    (2, "2015-12-02", "U3"),
    (2, "2015-12-05", None),
]
rdd = sc.parallelize(values)
df = rdd.toDF(["cookie_id", "c_date", "user_id"])
df = df.withColumn("c_date", df.c_date.cast("date"))
df.show()

The DataFrame is

+---------+----------+-------+
|cookie_id|    c_date|user_id|
+---------+----------+-------+
|        1|2015-12-01|   null|
|        1|2015-12-02|     U1|
|        1|2015-12-02|     U1|
|        1|2015-12-03|     U2|
|        1|2015-12-04|   null|
|        1|2015-12-05|   null|
|        2|2015-12-04|   null|
|        2|2015-12-03|   null|
|        2|2015-12-02|     U3|
|        2|2015-12-05|   null|
+---------+----------+-------+

Column used to sort the partitions

# get the sort key
def getKey(item):
    return item.c_date

The fill function. Can be used to fill in multiple columns if necessary.

# fill function
def fill(x):
    out = []
    last_val = None
    for v in x:
        if v["user_id"] is None:
            data = [v["cookie_id"], v["c_date"], last_val]
        else:
            data = [v["cookie_id"], v["c_date"], v["user_id"]]
            last_val = v["user_id"]
        out.append(data)
    return out

Convert to rdd, partition, sort and fill the missing values

# Partition the data
rdd = df.rdd.groupBy(lambda x: x.cookie_id).mapValues(list)
# Sort the data by date
rdd = rdd.mapValues(lambda x: sorted(x, key=getKey))
# fill missing value and flatten
rdd = rdd.mapValues(fill).flatMapValues(lambda x: x)
# discard the key
rdd = rdd.map(lambda v: v[1])

Convert back to DataFrame

df_out = sqlContext.createDataFrame(rdd)
df_out.show()

The output is

+---+----------+----+
| _1|        _2|  _3|
+---+----------+----+
|  1|2015-12-01|null|
|  1|2015-12-02|  U1|
|  1|2015-12-02|  U1|
|  1|2015-12-03|  U2|
|  1|2015-12-04|  U2|
|  1|2015-12-05|  U2|
|  2|2015-12-02|  U3|
|  2|2015-12-03|  U3|
|  2|2015-12-04|  U3|
|  2|2015-12-05|  U3|
+---+----------+----+

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...