Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
411 views
in Technique[技术] by (71.8m points)

apache spark - In python 3.5.2, how to elegantly chain an unknown quantity of functions on an object than changes type?

Introduction

I am not very certain the title is clear. I am not a native english speaker so if someones has a better summary for what this post is about, please edit !

Environment

  • python 3.5.2
  • pyspark 2.3.0

The context

I have a spark dataframe. This data, before being written to elastic search, gets transformed.

In my case, I have two transformations. They are map functions on the rdd of the dataframe.

However, instead of hard writing them, I want to make it so that I can give my function (that handles the data transformation) X functions that will be, one by one, applied to the dataframe (for the first function) and/or the result of the previous transformation function.

Initial work

This is the previous state, not desired, hard written:

df.rdd.map(transfo1) 
        .map(transfo2) 
        .saveAsNewAPIHadoopFile

What I have so far

def write_to_index(self, transformation_functions: list, dataframe):
    // stuff
    for transfo in transformation_functions:
        dataframe = dataframe.rdd.map(transfo)

        dataframe.saveAsNewAPIHadoopFile

However, this has a problem: if the return of the first transformation is not a dataframe, it will fail on the second iteration of the loop because the resulting object does not have a rdd property.

Working solution

        object_to_process = dataframe.rdd
        for transfo in transformation_functions:
            object_to_process = object_to_process.map(transfo)

        object_to_process.saveAsNewAPIHadoopFile

The solution above seems to work (does throw any error at least). But I would like to know if there is a more elegant solution or any built-in python solution for this.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

You can use this one-liner:

from functools import reduce

def write_to_index(self, transformation_functions: list, dataframe):
    reduce(lambda x, y: x.map(y), transformation_functions, dataframe.rdd).saveAsNewAPIHadoopFile

which, if written verbosely, should be identical to

dataframe.rdd.map(transformation_functions[0]) 
             .map(transformation_functions[1]) 
             .map(...) 
             .saveAsNewAPIHadoopFile

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...