Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
406 views
in Technique[技术] by (71.8m points)

apache spark - Reading partition columns without partition column names

We have data stored in s3 partitioned in the following structure:

bucket/directory/table/aaaa/bb/cc/dd/

where aaaa is the year, bb is the month, cc is the day and dd is the hour.

As you can see, there are no partition keys in the path (year=aaaa, month=bb, day=cc, hour=dd).

As a result, when I read the table into Spark, there is no year, month, day or hour columns.

Is there anyway I can read the table into Spark and include the partitioned column without:

  • changing the path names in s3
  • iterating over each partition value in a loop and reading each partition one by one into Spark (it is a huge table and this takes far too long and is obviously sub-optimal).
question from:https://stackoverflow.com/questions/65869626/reading-partition-columns-without-partition-column-names

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Spark can't discover partitions that aren't encoded as partition_name=value in the path so you'll have to create them.

After you load the the paths bucket/directory/table/aaaa/bb/cc/dd/ into you a DataFrame, you can extract those partitions from the source filename which you get with input_file_name().

First, split the filename path using / delimiter then create columns from the last 4 elements:

from pyspark.sql import functions as F

df1 = df.withColumn("date_partitions", F.slice(F.split(F.input_file_name(), "/"), -5, 4)) 
    .withColumn("year", F.col("date_partitions").getItem(0)) 
    .withColumn("month", F.col("date_partitions").getItem(1)) 
    .withColumn("day", F.col("date_partitions").getItem(2)) 
    .withColumn("hour", F.col("date_partitions").getItem(3)) 
    .drop("data_partitions")

Example:

data = [
    (1, 2, "bucket/directory/table/2021/01/10/14/"),
    (3, 4, "bucket/directory/table/2021/01/11/18/")
]

df = spark.createDataFrame(data, ["a", "b", "input_file_name"])

Gives:

#+---+---+-------------------------------------+----+-----+---+----+
#|a  |b  |input_file_name                      |year|month|day|hour|
#+---+---+-------------------------------------+----+-----+---+----+
#|1  |2  |bucket/directory/table/2021/01/10/14/|2021|01   |10 |14  |
#|3  |4  |bucket/directory/table/2021/01/11/18/|2021|01   |11 |18  |
#+---+---+-------------------------------------+----+-----+---+----+

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...