I often have the need to perform custom aggregations on dataframes in spark 2.1, and used these two approaches :
- using groupby/collect_list to get all the values in a single row, then apply an UDF to aggregate the values
- Writing a custom UDAF (User defined aggregate function)
I generally prefer the first option as its easier to implement and more readable than the UDAF implementation. But I would assume that the first option is generally slower, because more data is sent around the network (no partial aggregation), but my experience shows that UDAF are generally slow. Why is that?
Concrete example: Calculating histograms:
Data is in a hive table (1E6 random double values)
val df = spark.table("testtable")
def roundToMultiple(d:Double,multiple:Double) = Math.round(d/multiple)*multiple
UDF approach:
val udf_histo = udf((xs:Seq[Double]) => xs.groupBy(x => roundToMultiple(x,0.25)).mapValues(_.size))
df.groupBy().agg(collect_list($"x").as("xs")).select(udf_histo($"xs")).show(false)
+--------------------------------------------------------------------------------+
|UDF(xs) |
+--------------------------------------------------------------------------------+
|Map(0.0 -> 125122, 1.0 -> 124772, 0.75 -> 250819, 0.5 -> 248696, 0.25 -> 250591)|
+--------------------------------------------------------------------------------+
UDAF-Approach
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import scala.collection.mutable
class HistoUDAF(binWidth:Double) extends UserDefinedAggregateFunction {
override def inputSchema: StructType =
StructType(
StructField("value", DoubleType) :: Nil
)
override def bufferSchema: StructType =
new StructType()
.add("histo", MapType(DoubleType, IntegerType))
override def deterministic: Boolean = true
override def dataType: DataType = MapType(DoubleType, IntegerType)
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Map[Double, Int]()
}
private def mergeMaps(a: Map[Double, Int], b: Map[Double, Int]) = {
a ++ b.map { case (k,v) => k -> (v + a.getOrElse(k, 0)) }
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val oldBuffer = buffer.getAs[Map[Double, Int]](0)
val newInput = Map(roundToMultiple(input.getDouble(0),binWidth) -> 1)
buffer(0) = mergeMaps(oldBuffer, newInput)
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val a = buffer1.getAs[Map[Double, Int]](0)
val b = buffer2.getAs[Map[Double, Int]](0)
buffer1(0) = mergeMaps(a, b)
}
override def evaluate(buffer: Row): Any = {
buffer.getAs[Map[Double, Int]](0)
}
}
val histo = new HistoUDAF(0.25)
df.groupBy().agg(histo($"x")).show(false)
+--------------------------------------------------------------------------------+
|histoudaf(x) |
+--------------------------------------------------------------------------------+
|Map(0.0 -> 125122, 1.0 -> 124772, 0.75 -> 250819, 0.5 -> 248696, 0.25 -> 250591)|
+--------------------------------------------------------------------------------+
My tests show that the collect_list/UDF approach is about 2 times faster than the UDAF approach. Is this a general rule, or are there cases where UDAF is really much faster and the rather awkward implemetation is justified?
See Question&Answers more detail:
os