Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
415 views
in Technique[技术] by (71.8m points)

pyspark - Why is the fold action necessary in Spark?

I've a silly question involving fold and reduce in PySpark. I understand the difference between these two methods, but, if both need that the applied function is a commutative monoid, I cannot figure out an example in which fold cannot be substituted byreduce`.

Besides, in the PySpark implementation of fold it is used acc = op(obj, acc), why this operation order is used instead of acc = op(acc, obj)? (this second order sounds more closed to a leftFold to me)

Cheers

Tomas

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Empty RDD

It cannot be substituted when RDD is empty:

val rdd = sc.emptyRDD[Int]
rdd.reduce(_ + _)
// java.lang.UnsupportedOperationException: empty collection at   
// org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$ ...

rdd.fold(0)(_ + _)
// Int = 0

You can of course combine reduce with condition on isEmpty but it is rather ugly.

Mutable buffer

Another use case for fold is aggregation with mutable buffer. Consider following RDD:

import breeze.linalg.DenseVector

val rdd = sc.parallelize(Array.fill(100)(DenseVector(1)), 8)

Lets say we want a sum of all elements. A naive solution is to simply reduce with +:

rdd.reduce(_ + _)

Unfortunately it creates a new vector for each element. Since object creation and subsequent garbage collection is expensive it could be better to use a mutable object. It is not possible with reduce (immutability of RDD doesn't imply immutability of the elements), but can be achieved with fold as follows:

rdd.fold(DenseVector(0))((acc, x) => acc += x)

Zero element is used here as mutable buffer initialized once per partition leaving actual data untouched.

acc = op(obj, acc), why this operation order is used instead of acc = op(acc, obj)

See SPARK-6416 and SPARK-7683


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...