Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
984 views
in Technique[技术] by (71.8m points)

apache spark - How to pass a constant value to Python UDF?

I was thinking if it was possible to create an UDF that receives two arguments a Column and another variable (Object,Dictionary, or any other type), then do some operations and return the result.

Actually, I attempted to do this but I got an exception. Therefore, I was wondering if there was any way to avoid this problem.

df = sqlContext.createDataFrame([("Bonsanto", 20, 2000.00), 
                                 ("Hayek", 60, 3000.00), 
                                 ("Mises", 60, 1000.0)], 
                                ["name", "age", "balance"])

comparatorUDF = udf(lambda c, n: c == n, BooleanType())

df.where(comparatorUDF(col("name"), "Bonsanto")).show()

And I get the following error:

AnalysisException: u"cannot resolve 'Bonsanto' given input columns name, age, balance;"

So it's obvious that the UDF "sees" the string "Bonsanto" as a column name, and actually I'm trying to compare a record value with the second argument.

On the other hand, I know that it's possible to use some operators inside a where clause (but actually I want to know if it is achievable using an UDF), as follows:

df.where(col("name") == "Bonsanto").show()

#+--------+---+-------+
#|    name|age|balance|
#+--------+---+-------+
#|Bonsanto| 20| 2000.0|
#+--------+---+-------+
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Everything that is passed to an UDF is interpreted as a column / column name. If you want to pass a literal you have two options:

  1. Pass argument using currying:

    def comparatorUDF(n):
        return udf(lambda c: c == n, BooleanType())
    
    df.where(comparatorUDF("Bonsanto")(col("name")))
    

    This can be used with an argument of any type as long as it is serializable.

  2. Use a SQL literal and the current implementation:

    from pyspark.sql.functions import lit
    
    df.where(comparatorUDF(col("name"), lit("Bonsanto")))
    

    This works only with supported types (strings, numerics, booleans). For non-atomic types see How to add a constant column in a Spark DataFrame?


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...