Context
Spark 2.0.1, spark-submit in cluster mode. I am reading a parquet file from hdfs:
val spark = SparkSession.builder
.appName("myApp")
.config("hive.metastore.uris", "thrift://XXX.XXX.net:9083")
.config("spark.sql.sources.bucketing.enabled", true)
.enableHiveSupport()
.getOrCreate()
val df = spark.read
.format("parquet")
.load("hdfs://XXX.XX.X.XX/myParquetFile")
I am saving the df
to a hive table with 50 buckets grouped by userid
:
df0.write
.bucketBy(50, "userid")
.saveAsTable("myHiveTable")
Now, when I look into the hive warehouse at my hdfs /user/hive/warehouse
there is a folder named myHiveTable
. Inside it are a bunch of part-*.parquet
files. I would expect there to be 50 files. But no, there are 3201 files!!!! There are 64 files per partition, why? There are different number of files per partitions for different files I saved as hive table. All the files are very small, just tens of Kb each!
Let me add, that number of different userid
is about 1 000 000
in myParquetFile
.
Question
Why are there 3201 files in the folder instead of 50! What are they?
When I read this table back into DataFrame and print number of partitions:
val df2 = spark.sql("SELECT * FROM myHiveTable")
println(df2.rdd.getNumPartitions)
The number of partitions isIt is correctly 50 and I confirmed that the data is correctly partitioned by userid
.
For one of my large datasets 3Tb I create a table with 1000 partitions which created literally ~million of files! Which exceeds a directory item limit of 1048576 and gives org.apache.hadoop.hdfs.protocol.FSLimitException$MaxDirectoryItemsExceededException
Question
What does the number of files created depend on?
Question
Is there a way to limit number of files created?
Question
Should I worry about these files? Does it hurt performance on df2
by having all these files? It is always said we should not create too many partitions because it is problematic.
Question
I found this info HIVE Dynamic Partitioning tips that the number of files might be related to number of mappers. It is suggested to use distribute by
while inserting to hive table. How could I do it in Spark?
Question
If the problem is indeed as in the link above, here How to control the file numbers of hive table after inserting data on MapR-FS they suggest using options such as hive.merge.mapfiles
or hive.merge.mapredfiles
to merge all the small files after map reduce job. Are there options for this in Spark?
See Question&Answers more detail:
os