Spark can't discover partitions that aren't encoded as partition_name=value
in the path so you'll have to create them.
After you load the the paths bucket/directory/table/aaaa/bb/cc/dd/
into you a DataFrame, you can extract those partitions from the source filename which you get with input_file_name()
.
First, split the filename path using /
delimiter then create columns from the last 4 elements:
from pyspark.sql import functions as F
df1 = df.withColumn("date_partitions", F.slice(F.split(F.input_file_name(), "/"), -5, 4))
.withColumn("year", F.col("date_partitions").getItem(0))
.withColumn("month", F.col("date_partitions").getItem(1))
.withColumn("day", F.col("date_partitions").getItem(2))
.withColumn("hour", F.col("date_partitions").getItem(3))
.drop("data_partitions")
Example:
data = [
(1, 2, "bucket/directory/table/2021/01/10/14/"),
(3, 4, "bucket/directory/table/2021/01/11/18/")
]
df = spark.createDataFrame(data, ["a", "b", "input_file_name"])
Gives:
#+---+---+-------------------------------------+----+-----+---+----+
#|a |b |input_file_name |year|month|day|hour|
#+---+---+-------------------------------------+----+-----+---+----+
#|1 |2 |bucket/directory/table/2021/01/10/14/|2021|01 |10 |14 |
#|3 |4 |bucket/directory/table/2021/01/11/18/|2021|01 |11 |18 |
#+---+---+-------------------------------------+----+-----+---+----+
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…