I have a dataset that is around 190GB that was partitioned into 1000 partitions.
my EMR cluster allows a maximum of 10 r5a.2xlarge
TASK nodes and 2 CORE nodes. Each node having 64GB mem and 128GB EBS storage.
In my spark job execution, I have set it to use executor-cores 5
, driver cores 5
,executor-memory 40g
, driver-memory 50g
, spark.yarn.executor.memoryOverhead=10g
, spark.sql.shuffle.partitions=500
, spark.dynamicAllocation.enabled=true
But my job keeps failing with errors like
spark.shuffle.MetadataFetchFailedException
spark.shuffle.FetchFailedException
java.io.IOException: No space left on device
Container Lost
etc...
A lot of the answers to these kinds of issues that I found online say to increase the memoryOverhead. Which i did, from 2G to 10G. My total executor memory
and memoryOverhead
is 50G. with 40G allocated to executor and 10G allocated to overhead. But I think I am reaching the limit since I won't be able to go above 56.
I thought i did all that was possible to optmize my spark job:
- Increase partitions
- Increase spark.sql.shuffle.partitions
- Increase executor and overhead memory
But my job still fails. Is there anything else I can try? Should i increase my overhead even more so that my executor memory/overhead memory is 50/50?
The memory profile of my job from ganglia looks something like this:
(The steep drop is when the cluster flushed all the executor nodes due to them being dead)
Any insight would be greatly appreciated
Thank You
EDIT:[SOLUTION]
I am appending to my post with the exact solution that solved my problem thanks to Debuggerrr
based on his suggestions in his answer.
- I had a large data frame that I was re-using after doing many
computations on other dataframes. By using the
persist()
method
(suggested by Debuggerrr), I was able to save that to MEMORY and DISC
and simply call it back without parts of it being cleaned up by the
GC.
- I also followed the best practices blog Debuggerrr mentioned in his answer and calculated the correct executor memory, number of executors etc. But what I failed to do was disable
spark.dynamicAllocation.enabled
. The blog states that it is best to set the property to false if we are calculating the resources manually since spark tends to misallocate resources if your calculation doesnt line up with it. Once I set it to false, and set the correct executor and spark attributes, it worked like a charm!
[EDIT 2]:
The parameters that specifically worked for my job are:
--executor-cores 5 --driver-cores 5 --executor-memory 44g --driver-memory 44g --num-executors 9 --conf spark.default.parallelism=100 --conf spark.sql.shuffle.partitions=300 --conf spark.yarn.executor.memoryOverhead=11g --conf spark.shuffle.io.retryWait=180s --conf spark.network.timeout=800s --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.dynamicAllocation.enabled=false
question from:
https://stackoverflow.com/questions/65866586/optimizing-spark-resources-to-avoid-memory-and-space-usage 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…