Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
351 views
in Technique[技术] by (71.8m points)

scala - How can I pass extra parameters to UDFs in Spark SQL?

I want to parse the date columns in a DataFrame, and for each date column, the resolution for the date may change (i.e. 2011/01/10 => 2011 /01 if the resolution is set to "Month").

I wrote the following code:

def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame =
{
  import org.apache.spark.sql.functions._
  val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)}
  val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)}

  val allColNames = dataframe.columns
  val allCols = allColNames.map(name => dataframe.col(name))

  val mappedCols =
  {
    for(i <- allCols.indices) yield
    {
      schema(i) match
      {
        case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i)))
        case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i))
        case _ => allCols(i)
      }
    }
  }

  dataframe.select(mappedCols:_*)

}}

However it doesn't work. It seems that I can only pass Columns to UDFs. And I wonder if it will be very slow if I convert the DataFrame to RDD and apply the function on each row.

Does anyone know the correct solution? Thank you!

Question&Answers:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Just use a little bit of currying:

def convertDateFunc(resolution: DateResolutionType) = udf((x:String) => 
  SparkDateTimeConverter.convertDate(x, resolution))

and use it as follows:

case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i))

On a side note you should take a look at sql.functions.trunc and sql.functions.date_format. These should at least part of the job without using UDFs at all.

Note:

In Spark 2.2 or later you can use typedLit function:

import org.apache.spark.sql.functions.typedLit

which support a wider range of literals like Seq or Map.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...