I'm trying to run spark application using spark-submit.
I've created the followig udf:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from tldextract import tldextract
@udf(StringType())
def get_domain(url):
ext = tldextract.extract(url)
return ext.domain
Then I used it like this:
df = df.withColumn('domain', col=get_domain(df['url']))
And got the following error:
Driver stacktrace:
21/01/03 16:53:41 INFO DAGScheduler: Job 1 failed: showString at NativeMethodAccessorImpl.java:0, took 2.842401 s
Traceback (most recent call last):
File "/home/michal/dv-etl/main.py", line 54, in <module>
main()
File "/home/michal/dv-etl/main.py", line 48, in main
df.show(truncate=False)
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 442, in show
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 134, in deco
File "<string>", line 3, in raise_from
pyspark.sql.utils.PythonException:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 589, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 447, in read_udfs
udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 254, in read_single_udf
f, return_type = read_command(pickleSer, infile)
File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 74, in read_command
command = serializer._read_with_length(file)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length
return self.loads(obj)
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loads
return pickle.loads(obj, encoding=encoding)
File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 1110, in subimport
__import__(name)
ModuleNotFoundError: No module named 'tldextract'
I zipped the dependencies and used an egg file and it still doesn't work.
My spark cluser consists from a master and one worker on the same server.
I suspect it is the udf, because when I use tldextract
in a regular function and not in the udf it works.
This is my spark-submit
command:
spark-submit --master spark://spark-server:7077 main.py --py-files dist/app-0.0.1-py3.8.egg requirements.zip
Thank you!