Introduction
I am not very certain the title is clear. I am not a native english speaker so if someones has a better summary for what this post is about, please edit !
Environment
python 3.5.2
pyspark 2.3.0
The context
I have a spark dataframe. This data, before being written to elastic search, gets transformed.
In my case, I have two transformations. They are map
functions on the rdd of the dataframe.
However, instead of hard writing them, I want to make it so that I can give my function (that handles the data transformation) X functions that will be, one by one, applied to the dataframe (for the first function) and/or the result of the previous transformation function.
Initial work
This is the previous state, not desired, hard written:
df.rdd.map(transfo1)
.map(transfo2)
.saveAsNewAPIHadoopFile
What I have so far
def write_to_index(self, transformation_functions: list, dataframe):
// stuff
for transfo in transformation_functions:
dataframe = dataframe.rdd.map(transfo)
dataframe.saveAsNewAPIHadoopFile
However, this has a problem: if the return of the first transformation is not a dataframe, it will fail on the second iteration of the loop because the resulting object does not have a rdd property.
Working solution
object_to_process = dataframe.rdd
for transfo in transformation_functions:
object_to_process = object_to_process.map(transfo)
object_to_process.saveAsNewAPIHadoopFile
The solution above seems to work (does throw any error at least). But I would like to know if there is a more elegant solution or any built-in python solution for this.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…