Let's say I have 280 parquet files. Each of those files was already partitioned/prepared by some other system component so I don't want to split them or join them.
I would like to load 3 columns out of each file and process it on different executor thread.
In the past I managed to achieve this kind of parallelization with .json files like this:
rdd = sparkContext
.textFile(data_pattern)
.mapPartitions(...)
Where data_pattern
could be either list of .json files separated by ,
or directory, or list of directories separated by ,
.
What I have tried for parquet files so far is - this kind of setup:
import pandas
def process(url):
df = pandas.read_parquet(url, columns=["A", "B", "C"])
(...)
rdd = spark.sparkContext
.parallelize(input_files, numSlices=len(input_files))
.flatMap(lambda x: process(x))
But what dislike about it is usage of pandas
library to load the files - because if they are coming from the cloud, then ffspec
is having sometimes issues with credentials - so I want whole loading process to be done on pySpark side.
When I do just this:
rdd = spark.read.parquet(data_pattern).select("A", "B", "C")
then rdd
becomes just a DataFrame object DataFrame[A: string, B: double, C: int]
. So as I understand this all my files in data_pattern
were joined (I am also worried about memory here, what if I have 6000 * 300mb files?) and I have to use some partitionBy
function which I would like to avoid because as I said, every parquet file contains all data I need - I want just load each of parquets separately and map function over them.
Can anyone help?
question from:
https://stackoverflow.com/questions/65939657/how-to-mappartition-or-flapmap-each-parquet-file-separately-in-pyspark 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…