Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.3k views
in Technique[技术] by (71.8m points)

dataframe - Compare two values with Scala Spark

I got the next parquet file:

+--------------+------------+-------+
|gf_cutoff     | country_id |gf_mlt |
+--------------+------------+-------+
|2020-12-14    |DZ          |5      |
|2020-08-06    |DZ          |4      |
|2020-07-03    |DZ          |4      |
|2020-12-14    |LT          |1      |
|2020-08-06    |LT          |1      |
|2020-07-03    |LT          |1      |

As you can see is particioned by country_id and ordered by gf_cutoff DESC. What I want to do es compare gf_mlt to check if the value has changed. To do that I want to compare the most recently gf_cutoff with the second one.

A example of this case would be compare:

 2020-12-14 DZ 5
with
 2020-08-06 DZ 4

And I want to write in a new column, if the value of the most recent date is different of the second row, put in a new column, the most recent value that is 5 for DZ and put in another column True if the value has changed or false if has not changed. Afther did this comparation, delete the rows with the older rows.

For DZ has changed and for LT hasn't changed because is all time 1.

So the output would be like this:

+--------------+------------+-------+------------+-----------+
|gf_cutoff     | country_id |gf_mlt | Has_change | old_value |
+--------------+------------+-------+------------+-----------+
|2020-12-14    |DZ          |5      |    True    |     4     |
|2020-12-14    |LT          |1      |    False   |     1     |

If you need more explanation, just tell me it.

question from:https://stackoverflow.com/questions/65842685/compare-two-values-with-scala-spark

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

You can use lag over an appropriate window to get the most recent value, and then filter the most recent rows using a row_number over another appropriate window:

import org.apache.spark.sql.expressions.Window

val df2 = df.withColumn(
    "last_value",
    lag("gf_mlt", 1).over(Window.partitionBy("country_id").orderBy("gf_cutoff"))
).withColumn(
    "rn", 
    row_number().over(Window.partitionBy("country_id").orderBy(desc("gf_cutoff")))
).filter("rn = 1").withColumn(
    "changed",
    $"gf_mlt" === $"last_value"
).drop("rn")

df2.show
+----------+----------+------+----------+-------+
| gf_cutoff|country_id|gf_mlt|last_value|changed|
+----------+----------+------+----------+-------+
|2020-12-14|        DZ|     5|         4|  false|
|2020-12-14|        LT|     1|         1|   true|
+----------+----------+------+----------+-------+

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

2.1m questions

2.1m answers

60 comments

57.0k users

...