You can use lag
to check whether the previous row is null, and if the current row is not null, you can flag that as the start of a block. After that, sum the flags and you'll get the block number that you wanted.
from pyspark.sql import functions as F, Window
df2 = df.withColumn(
'block',
(~F.col('age').isNull() & F.lag('age').over(Window.orderBy('ID')).isNull()).cast('int')
).withColumn(
'block',
F.when(~F.col('age').isNull(), F.sum('block').over(Window.orderBy('ID')))
)
df2.show()
+---+----+-----+
| ID| age|block|
+---+----+-----+
| 1|null| null|
| 2| 10| 1|
| 3| 90| 1|
| 4|null| null|
| 5|null| null|
| 6|null| null|
| 7| 20| 2|
| 8| 30| 2|
| 9| 70| 2|
| 10|null| null|
+---+----+-----+
Then you can do the aggregation:
df3 = (df2.filter('block is not null')
.groupBy('block')
.agg(F.min('ID').alias('First_ID'), F.max('ID').alias('Last_ID'), F.avg('age').alias('avg_age'))
.drop('block')
)
df3.show()
+--------+-------+-------+
|First_ID|Last_ID|avg_age|
+--------+-------+-------+
| 2| 3| 50.0|
| 7| 9| 40.0|
+--------+-------+-------+
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…