Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
2.5k views
in Technique[技术] by (71.8m points)

apache spark - Usage of custom Python object in Pyspark UDF

When running following piece of PySpark code:

nlp = NLPFunctions()

def parse_ingredients(ingredient_lines):
    parsed_ingredients = nlp.getingredients_bulk(ingredient_lines)[0]
    return list(chain.from_iterable(parsed_ingredients))


udf_parse_ingredients = UserDefinedFunction(parse_ingredients, ArrayType(StringType()))

I get the following error: _pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.lock objects

I imagine this is because PySpark can not serialize this custom class. But how can I avoid the overhead of instantiating this expensive object on every run of the parse_ingredients_line function?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Let's say you want to use Identity class defined like this (identity.py):

class Identity(object):                   
    def __getstate__(self):
        raise NotImplementedError("Not serializable")

    def identity(self, x):
        return x

you can for example use a callable object (f.py) and store an Identity instance as a class member:

from identity import Identity

class F(object):                          
    identity = None

    def __call__(self, x):
        if not F.identity:
            F.identity = Identity()
        return F.identity.identity(x)

and use these as shown below:

from pyspark.sql.functions import udf
import f

sc.addPyFile("identity.py")
sc.addPyFile("f.py")

f_ = udf(f.F())

spark.range(3).select(f_("id")).show()
+-----+
|F(id)|
+-----+
|    0|
|    1|
|    2|
+-----+

or standalone function and closure:

from pyspark.sql.functions import udf
import identity

sc.addPyFile("identity.py")

def f(): 
    dict_ = {}                 
    @udf()              
    def f_(x):                 
        if "identity" not in dict_:
            dict_["identity"] = identity.Identity()
        return dict_["identity"].identity(x)
    return f_


spark.range(3).select(f()("id")).show()
+------+
|f_(id)|
+------+
|     0|
|     1|
|     2|
+------+

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...