Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.1k views
in Technique[技术] by (71.8m points)

pyspark - Writing large spark data frame as parquet to s3 bucket

My Scenario

  • I have a spark data frame in a AWS glue job with 4 million records
  • I need to write it as a SINGLE parquet file in AWS s3

Current code

file_spark_df.write.parquet("s3://"+target_bucket_name)

Issue the above code creates 100+ files each 17.8 to 18.1 MB in size , guess its some default break down size

Ques 1 : How do I create just one file ? for one spark data frame ? I checked https://spark.apache.org/docs/latest/sql-data-sources-parquet.html didnt find any parameter to set

Ques 2 : How do I specify the name of the file I tried ...

file_df.write.parquet("s3://"+target_bucket_name+"/"+target_file_name)

It created 100+ files inside "s3://"+target_bucket_name+"/"+target_file_name

Ques 3 : How do I specify the name of the file I need to create sub folders inside base3 bucket following code can do the job

file_df.write.parquet("s3://"+target_bucket_name+"/"+today_date+"/"+target_file_name)

not sure if its the best way ... or there is a better way ?

question from:https://stackoverflow.com/questions/65832736/writing-large-spark-data-frame-as-parquet-to-s3-bucket

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Use .repartition(1) or as @blackbishop says, coalesce(1) to say "I only want one partition on the output"

  • use a subdir as things don't like writing to the root path. It's not a normal directory
  • filenames get chosen by the partition code, best to list the dir for the single file and rename.

it should look something like this

val dest = "s3://"+target_bucket_name + "/subdir"
val destPath = newPath(dest)
val fs = Filesystem.get(destPath, conf) // where conf is the hadoop conf from your spark conf
fs.delete(destPath, true)
file_spark_df.parquet.repartition(1).write.(dest)

// at this point there should be only one file in the dest dir
val files = fs.listStatus(destPath)   // array of fileStatus of size == 1
if (fs.size != 1) throw new IOException("Wrong number of files in " + destPath)
fs.rename(files[0].getPath(), new Path(destPath, "final-filename.parquet")

(note, code written @ console, not compiled, tested etc. You should get the idea though)


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...