Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.3k views
in Technique[技术] by (71.8m points)

apache spark - How to rename my JSON generated by pyspark?

When i write my JSON file with

dataframe.coalesce(1).write.format('json')

on pyspark im not able to change the name of file in the partition

Im writing my JSON like that:

dataframe.coalesce(1).write.format('json').mode('overwrite').save('path')

but im not able to change the name of file in the partition

I want the path like that:

/folder/my_name.json

where 'my_name.json' is a json file

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

In spark we can't control name of the file written to the directory.

First write the data to the HDFS directory then For changing the name of file we need to use HDFS api.

Example:

In Pyspark:

l=[("a",1)]
ll=["id","sa"]
df=spark.createDataFrame(l,ll)

hdfs_dir = "/folder/" #your hdfs directory
new_filename="my_name.json" #new filename

df.coalesce(1).write.format("json").mode("overwrite").save(hdfs_dir)

fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())

#list files in the directory

list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(hdfs_dir))

#filter name of the file starts with part-

file_name = [file.getPath().getName() for file in list_status if file.getPath().getName().startswith('part-')][0]

#rename the file

fs.rename(Path(hdfs_dir+''+file_name),Path(hdfs_dir+''+new_filename))

In case if you want to delete success files in the directory use fs.delete to delete _Success files.

In Scala:

val df=Seq(("a",1)).toDF("id","sa")
df.show(false)

import org.apache.hadoop.fs._

val hdfs_dir = "/folder/"
val new_filename="new_json.json"

df.coalesce(1).write.mode("overwrite").format("json").save(hdfs_dir)

val fs=FileSystem.get(sc.hadoopConfiguration)
val f=fs.globStatus(new Path(s"${hdfs_dir}" + "*")).filter(x => x.getPath.getName.toString.startsWith("part-")).map(x => x.getPath.getName).mkString

fs.rename(new Path(s"${hdfs_dir}${f}"),new Path(s"${hdfs_dir}${new_filename}"))

fs.delete(new Path(s"${hdfs_dir}" + "_SUCCESS"))

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...