• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

如何覆盖spark中的输出目录

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

我有一个Spark Streaming(流式)应用程序,每分钟产生一个数据集。我需要保存/覆盖处理的数据的结果。

当我试图覆盖数据集时,抛异常org.apache.hadoop.mapred.FileAlreadyExistsException,然后停止执行。

我设置了Spark属性set("spark.files.overwrite","true"),但没作用。

如何覆盖或预先删除Spark文件呢?

最佳解决思路

建议使用Dataframes,加上类似... .write.mode(SaveMode.Overwrite) ...的代码。

对于旧版本尝试

yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false")
val sc = SparkContext(yourSparkConf)

在1.1.0中,您可以使用带有–conf标志的spark-submit脚本来设置conf设置。

次佳解决思路

参数spark.files.overwrite的意思:“当目标文件存在且其内容与源不匹配时,是否覆盖通过SparkContext.addFile()添加的文件”。所以它对saveAsTextFiles方法没有影响。

你可以在保存文件之前做到这一点:

val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }

这里有更多解释:http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696.html

第三种解决思路

从pyspark.sql.DataFrame.save文档来看(当前位于1.3.1),可以在保存DataFrame时指定mode='overwrite'

myDataFrame.save(path='myPath', source='parquet', mode='overwrite')

经证实,这甚至会删除分区文件。因此,如果您最初说了10个分区/文件,然后用只有6个分区的DataFrame覆盖了该文件夹,则生成的文件夹将具有6个分区/文件。

有关模式选项的更多信息,请参阅Spark SQL documentation。

第四种思路

由于df.save(path, source, mode)已弃用,(http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame)

使用df.write.format(source).mode("overwrite").save(path),其中df.write是DataFrameWriter

‘source’可以是(“com.databricks.spark.avro” | “parquet” | “json”)

参考资料

  • How to overwrite the output directory in spark


鲜花

握手

雷人

路过

鸡蛋
专题导读
上一篇:
如何添加一个新的列到Spark DataFrame(使用PySpark)?发布时间:2022-05-14
下一篇:
在Java中,如何将字符串转换为JSONObject发布时间:2022-05-14
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap