Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
733 views
in Technique[技术] by (71.8m points)

apache spark - update a dataframe column with new values

df1 has fields id and json; df2 has fields idand json

df1.count() => 1200; df2.count() => 20

df1 has all the rows. df2 has an incremental update with just 20 rows.

My goal is to update df1 with the values from df2. All the ids of df2 are in df1. But df2 has updated values(in the json field) for those same ids.

Resulting df should have all the values from df1 and updated values from df2.

What is the best way to do this? - With the least number of joins and filters.

Thanks!

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

You can achieve this using one left join.

Create Example DataFrames

Using the sample data provided by @Shankar Koirala in his answer.

data1 = [
  (1, "a"),
  (2, "b"),
  (3, "c")
]
df1 = sqlCtx.createDataFrame(data1, ["id", "value"])

data2 = [
  (1, "x"), 
  (2, "y")
]

df2 = sqlCtx.createDataFrame(data2, ["id", "value"])

Do a left join

Join the two DataFrames using a left join on the id column. This will keep all of the rows in the left DataFrame. For the rows in the right DataFrame that don't have a matching id, the value will be null.

import pyspark.sql.functions as f
df1.alias('l').join(df2.alias('r'), on='id', how='left')
    .select(
        'id',
         f.col('l.value').alias('left_value'),
         f.col('r.value').alias('right_value')
    )
    .show()
#+---+----------+-----------+
#| id|left_value|right_value|
#+---+----------+-----------+
#|  1|         a|          x|
#|  3|         c|       null|
#|  2|         b|          y|
#+---+----------+-----------+

Select the desired data

We will use the fact that the unmatched ids have a null to select the final columns. Use pyspark.sql.functions.when() to use the right value if it is not null, otherwise keep the left value.

df1.alias('l').join(df2.alias('r'), on='id', how='left')
    .select(
        'id',
        f.when(
            ~f.isnull(f.col('r.value')),
            f.col('r.value')
        ).otherwise(f.col('l.value')).alias('value')
    )
    .show()
#+---+-----+
#| id|value|
#+---+-----+
#|  1|    x|
#|  3|    c|
#|  2|    y|
#+---+-----+

You can sort this output if you want the ids in order.


Using pyspark-sql

You can do the same thing using a pyspark-sql query:

df1.registerTempTable('df1')
df2.registerTempTable('df2')

query = """SELECT l.id, 
CASE WHEN r.value IS NOT NULL THEN r.value ELSE l.value END AS value 
FROM df1 l LEFT JOIN df2 r ON l.id = r.id"""
sqlCtx.sql(query.replace("
", "")).show()
#+---+-----+
#| id|value|
#+---+-----+
#|  1|    x|
#|  3|    c|
#|  2|    y|
#+---+-----+

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...