I am using pyspark to estimate parameters for a logistic regression model. I use spark to calculate the likelihood and gradients and then use scipy's minimize function for optimization (L-BFGS-B).
I use yarn-client mode to run my application. My application could start to run without any problem. However, after a while it reports the following error:
Traceback (most recent call last):
File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/simulation/20160716-1626/spark_1m_data.py", line 115, in <module>
res = trainEM2(distData, params0, verbose=True, em_tol=1e-5, opt_method='L-BFGS-B')
File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 166, in trainEM
options={'disp': False})
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/_minimize.py", line 447, in minimize
callback=callback, **options)
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 330, in _minimize_lbfgsb
f, g = func_and_grad(x)
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 278, in func_and_grad
f = fun(x, *args)
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/optimize.py", line 289, in function_wrapper
return function(*(wrapper_args + args))
File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 146, in fun_observedQj
return dataAndWeightsj_old.map(lambda _: calObservedQj(_[0], _[1], vparamsj, params0)).sum()
File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 995, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 869, in fold
vals = self.mapPartitions(func).collect()
File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 772, in collect
return list(_load_from_socket(port, self._jrdd_deserializer))
File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 142, in _load_from_socket
for item in serializer.load_stream(rf):
File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 139, in load_stream
16/07/16 20:59:10 ERROR python.PythonRDD: Error while sending iterator
java.net.SocketTimeoutException: Accept timed out
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
at java.net.ServerSocket.implAccept(ServerSocket.java:545)
at java.net.ServerSocket.accept(ServerSocket.java:513)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645)
yield self._read_with_length(stream)
File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 156, in _read_with_length
length = read_int(stream)
File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 543, in read_int
length = stream.read(4)
File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/socket.py", line 384, in read
data = self._sock.recv(left)
socket.timeout: timed out
I also found python broken pipe
error when I set spark log level to "ALL".
I am using Spark 1.6.2 and Java 1.8.0_91. Any idea what's going on?
--Update--
I found this is related to the optimization routine I used in my program.
What I was doing is estimating a statistical model with maximum likelihood method using EM algorithm (as iterative algorithm). During each iteration, I need to update the parameters by solving a minimization problem. Spark is responsible for calculating my likelihood and gradient, which are then passed to Scipy's minimize routine where I use L-BFGS-B method. It seems that something in this routine that crashes my Spark job. But I have no idea which part of the routine is responsible for this issue.
Another observation is that, while using the same sample and same program, I changed the number of partitions. When the number of partition is small my program could finish without any problem. However, when the number of partitions becomes large, the program starts to crash.
See Question&Answers more detail:
os