First of all, Spark only starts reading in the data when an action (like count
, collect
or write
) is called. Once an action is called, Spark loads in data in partitions - the number of concurrently loaded partitions depend on the number of cores you have available. So in Spark you can think of 1 partition = 1 core = 1 task. Note that all concurrently loaded partitions have to fit into memory, or you will get an OOM.
Assuming that you have several stages, Spark then runs the transformations from the first stage on the loaded partitions only. Once it has applied the transformations on the data in the loaded partitions, it stores the output as shuffle-data and then reads in more partitions. It then applies the transformations on these partitions, stores the output as shuffle-data, reads in more partitions and so forth until all data has been read.
If you apply no transformation but only do for instance a count
, Spark will still read in the data in partitions, but it will not store any data in your cluster and if you do the count
again it will read in all the data once again. To avoid reading in data several times, you might call cache
or persist
in which case Spark will try to store the data in you cluster. On cache
(which is the same as persist(StorageLevel.MEMORY_ONLY)
it will store all partitions in memory - if it doesn't fit in memory you will get an OOM. If you call persist(StorageLevel.MEMORY_AND_DISK)
it will store as much as it can in memory and the rest will be put on disk. If data doesn't fit on disk either the OS will usually kill your workers.
Note that Spark has its own little memory management system. Some of the memory that you assign to your Spark job is used to hold the data being worked on and some of the memory is used for storage if you call cache
or persist
.
I hope this explanation helps :)
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…