本文整理汇总了Scala中org.apache.spark.sql.functions.udf类的典型用法代码示例。如果您正苦于以下问题:Scala udf类的具体用法?Scala udf怎么用?Scala udf使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了udf类的4个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: DateTimeColumn
//设置package包名称以及导入依赖的类
package me.danielpes.spark.datetime
import org.apache.spark.sql.Column
import org.apache.spark.sql.types.{DataType, DateType, TimestampType}
import org.apache.spark.sql.functions.{col, udf}
class DateTimeColumn(val col: Column, dataType: DataType = TimestampType) {
def +(p: Period): Column = dataType match {
case _: DateType => udf((d: java.sql.Date) => new RichDate(d) + p).apply(col)
case _: TimestampType => udf((ts: java.sql.Timestamp) => new RichDate(ts) + p).apply(col)
}
def -(p: Period): Column = this.+(-p)
override def toString: String = s"{column: ${col.toString}, type: ${dataType.toString}}"
}
object DateTimeColumn {
def apply(col: Column, dataType: DataType = TimestampType) = new DateTimeColumn(col, dataType)
def apply(col: Column, typeString: String) = new DateTimeColumn(col, typeFromString(typeString))
def apply(cName: String) = new DateTimeColumn(col(cName), TimestampType)
def apply(cName: String, dataType: DataType) = new DateTimeColumn(col(cName), dataType)
def apply(cName: String, typeString: String) = new DateTimeColumn(col(cName), typeFromString(typeString))
private def typeFromString(s: String): DataType = s match {
case "date" => DateType
case "timestamp" => TimestampType
}
}
开发者ID:danielpes,项目名称:spark-datetime-lite,代码行数:32,代码来源:DateTimeColumn.scala
示例2: OpenIE
//设置package包名称以及导入依赖的类
package com.knoldus
import edu.stanford.nlp.simple.{Document, Sentence}
import edu.stanford.nlp.util.Quadruple
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.functions.udf
import scala.collection.JavaConverters._
private case class OpenIE(subject: String, relation: String, target: String, confidence: Double) {
def this(quadruple: Quadruple[String, String, String, java.lang.Double]) =
this(quadruple.first, quadruple.second, quadruple.third, quadruple.fourth)
}
object StartApplication extends App{
val spark = SparkSession.builder().appName("spark-nlp-starter").master("local[*]").getOrCreate()
val sc = spark.sparkContext
val readPdfFile: Dataset[String] = spark.read.textFile("test")
readPdfFile.show(false)
def openie = udf { sentence: String =>
new Sentence(sentence).openie().asScala.map(q => new OpenIE(q)).toSeq
}
val res = readPdfFile.select(openie(readPdfFile("value")))
res.show(false)
}
开发者ID:shiv4nsh,项目名称:scala-spark-nlp-starter-kit,代码行数:30,代码来源:StartApplication.scala
示例3: setFunction
//设置package包名称以及导入依赖的类
package spark.feature
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.attribute.AttributeGroup
import org.apache.spark.ml.param.{ParamMap, _}
import org.apache.spark.ml.util._
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, UserDefinedFunction}
def setFunction(value: String=>Double) = set(function, value)
def getFunction() = $(function)
override def transform(dataset: DataFrame): DataFrame = {
val outputSchema = transformSchema(dataset.schema)
val metadata = outputSchema($(outputCol)).metadata
val dummy = udf { x: Any => $(expr) }
var data = dataset.select(col("*"), dummy(col($(inputCols).head)).as("0"))
val substitute: (String => ((String, Double) => String)) = name => (exp, elem) => exp.replace(name, elem.toString)
def subst(v: String) = udf(substitute(v))
$(inputCols).view.zipWithIndex foreach { case (v, i) => data = data.select(col("*"), subst(v)(data(i.toString), data(v)).as((i + 1).toString)).drop(i.toString) }
val eval = udf($(function))
data.select(col("*"), eval(data($(inputCols).length.toString)).as($(outputCol), metadata)).drop($(inputCols).length.toString)
}
override def transformSchema(schema: StructType): StructType = {
// TODO: Assertions on inputCols
val attrGroup = new AttributeGroup($(outputCol), $(numFeatures))
val col = attrGroup.toStructField()
require(!schema.fieldNames.contains(col.name), s"Column ${col.name} already exists.")
StructType(schema.fields :+ col)
}
override def copy(extra: ParamMap): FeatureFuTransformer = defaultCopy(extra)
}
开发者ID:laxmanjangley,项目名称:FFrame,代码行数:38,代码来源:FeatureFuTransformer.scala
示例4: UDFTest
//设置package包名称以及导入依赖的类
package org.apache.spark.examples.sql
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.functions.udf
object UDFTest {
def model(a: Long, b: Long, c: Long) = 3*a + 2*b + c
def main(args: Array[String]) {
System.setProperty("useNvl", "false")
System.setProperty("offHeap", "false")
System.setProperty("pythonNvl", "false")
val sparkConf = new SparkConf().setAppName("UDFTest").set("spark.executor.memory", args(1))
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
val m = udf(model(_:Long,_:Long,_:Long))
val df = sqlContext.read.parquet(args(0)).cache()
val a = new scala.collection.mutable.ArrayBuffer[Long]
for (i <- 0 until 11) {
val start = System.currentTimeMillis
df.withColumn("model", m(df("a"), df("b"), df("c"))).selectExpr("sum(model)").show()
a.append(System.currentTimeMillis - start)
}
println("average time: " + a.drop(1).sum / 10.0)
}
}
开发者ID:jjthomas,项目名称:spark-nvl,代码行数:28,代码来源:UDFTest.scala
注:本文中的org.apache.spark.sql.functions.udf类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论