I have a dataframe as follow:
from pyspark.sql import functions as f
from pyspark.sql.window import Window
df = spark.createDataFrame([
{"groupId":"A","Day":"2021-01-27", "ts": "2021-01-27 08:30:57.000", "Username": "user1", "Region": "US"},
{"groupId":"A","Day":"2021-01-27", "ts": "2021-01-27 08:31:57.014", "Username": "user2", "Region": "US"},
{"groupId":"A","Day":"2021-01-27", "ts": "2021-01-27 08:32:57.914", "Username": "user1", "Region": "MX"},
{"groupId":"A","Day":"2021-01-27", "ts": "2021-01-27 08:35:57.914", "Username": "user2", "Region": "CA"},
{"groupId":"A","Day":"2021-01-27", "ts": "2021-01-27 08:33:57.914", "Username": "user1", "Region": "UK"},
{"groupId":"A","Day":"2021-01-27", "ts": "2021-01-27 08:34:57.914", "Username": "user1", "Region": "GR"},
{"groupId":"A","Day":"2021-01-27", "ts": "2021-01-27 08:36:57.914", "Username": "user2", "Region": "IR"}])
w = Window.partitionBy().orderBy("groupId","Username").orderBy("Username","ts")
df2 = df.withColumn("prev_region", f.lag(df.Region).over(w))
Day |
Region |
Username |
groupId |
ts |
2021-01-27 |
US |
user1 |
A |
2021-01-27 08:30:57.000 |
2021-01-27 |
MX |
user1 |
A |
2021-01-27 08:32:57.914 |
2021-01-27 |
UK |
user1 |
A |
2021-01-27 08:33:57.914 |
2021-01-27 |
GR |
user1 |
A |
2021-01-27 08:34:57.914 |
2021-01-27 |
US |
user2 |
A |
2021-01-27 08:31:57.014 |
2021-01-27 |
CA |
user2 |
A |
2021-01-27 08:35:57.914 |
2021-01-27 |
IR |
user2 |
A |
2021-01-27 08:36:57.914 |
question from:
https://stackoverflow.com/questions/65930133/lag-function-on-grouped-data 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…