本文整理汇总了Scala中org.apache.spark.sql.Column类的典型用法代码示例。如果您正苦于以下问题:Scala Column类的具体用法?Scala Column怎么用?Scala Column使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Column类的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: DateTimeColumn
//设置package包名称以及导入依赖的类
package me.danielpes.spark.datetime
import org.apache.spark.sql.Column
import org.apache.spark.sql.types.{DataType, DateType, TimestampType}
import org.apache.spark.sql.functions.{col, udf}
class DateTimeColumn(val col: Column, dataType: DataType = TimestampType) {
def +(p: Period): Column = dataType match {
case _: DateType => udf((d: java.sql.Date) => new RichDate(d) + p).apply(col)
case _: TimestampType => udf((ts: java.sql.Timestamp) => new RichDate(ts) + p).apply(col)
}
def -(p: Period): Column = this.+(-p)
override def toString: String = s"{column: ${col.toString}, type: ${dataType.toString}}"
}
object DateTimeColumn {
def apply(col: Column, dataType: DataType = TimestampType) = new DateTimeColumn(col, dataType)
def apply(col: Column, typeString: String) = new DateTimeColumn(col, typeFromString(typeString))
def apply(cName: String) = new DateTimeColumn(col(cName), TimestampType)
def apply(cName: String, dataType: DataType) = new DateTimeColumn(col(cName), dataType)
def apply(cName: String, typeString: String) = new DateTimeColumn(col(cName), typeFromString(typeString))
private def typeFromString(s: String): DataType = s match {
case "date" => DateType
case "timestamp" => TimestampType
}
}
开发者ID:danielpes,项目名称:spark-datetime-lite,代码行数:32,代码来源:DateTimeColumn.scala
示例2: DataFrameFunctions
//设置package包名称以及导入依赖的类
package com.bloomberg.sparkflow.dc
import org.apache.spark.sql.{Column, Dataset, Row}
class DataFrameFunctions(self: DC[Row]) {
def join(right: DC[Row]): DC[Row] = {
val f = (left: Dataset[_], right: Dataset[_]) => {
left.join(right)
}
val hashTarget = Seq("join")
new MultiDatasetTransformDC(self, right, f, hashTarget)
}
def join(right: DC[Row], usingColumn: String): DC[Row] = {
val f = (left: Dataset[_], right: Dataset[_]) => {
left.join(right, usingColumn)
}
val hashTarget = Seq("join", usingColumn)
new MultiDatasetTransformDC(self, right, f, hashTarget)
}
def join(right: DC[Row], joinExprs: Column): DC[Row] = join(right, joinExprs, "inner")
def join(right: DC[Row], joinExprs: Column, joinType: String): DC[Row] = {
val f = (left: Dataset[_], right: Dataset[_]) => {
left.join(right, joinExprs)
}
val hashTarget = Seq("join", joinType, joinExprs.toString())
new MultiDatasetTransformDC(self, right, f, hashTarget)
}
}
开发者ID:bloomberg,项目名称:spark-flow,代码行数:36,代码来源:DataFrameFunctions.scala
示例3: Titanic
//设置package包名称以及导入依赖的类
package fr.ippon.spark.ml
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.{functions, Column, DataFrame, SQLContext}
object Titanic {
// Fonction de récupération des données d'un fichier de Titanic dans un DataFrame
def dataframeFromTitanicFile(sqlc: SQLContext, file: String): DataFrame = sqlc.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load(file)
// Fonction de calcul de l'age moyen
def calcMeanAge(df: DataFrame, inputCol: String): Double = df
.agg(functions.avg(df(inputCol)))
.head
.getDouble(0)
// Fonction nous donnant l'age ou la moyenne des ages
def fillMissingAge(df: DataFrame, inputCol: String, outputCol: String, replacementValue: Double): DataFrame = {
val ageValue: (Any) => Double = age => age match {
case age: Double => age
case _ => replacementValue
}
df.withColumn(outputCol, functions.callUDF(ageValue, DoubleType, df(inputCol)))
}
}
开发者ID:ippontech,项目名称:spark-bbl-prez,代码行数:31,代码来源:Titanic.scala
示例4: ImplicitsSuite
//设置package包名称以及导入依赖的类
package me.danielpes.spark.datetime
import org.apache.spark.sql.Column
import org.scalatest.FlatSpec
class ImplicitsSuite extends FlatSpec {
"this" should "implicitly convert Ints, Longs and Dates" in {
// Given
val intVal: Int = 15
val longVal: Long = 150L
val dateVal: java.sql.Date = java.sql.Date.valueOf("2010-01-01")
val columnVal: Column = new Column("a_column")
// When
import implicits._
val richIntVal: RichInt = intVal
val richLongVal: RichLong = longVal
val richDateVal: RichDate[java.sql.Date] = dateVal
val richColumnVal: RichColumn = columnVal
// Then
assert(richIntVal.value == intVal)
assert(richLongVal.value == longVal)
assert(richDateVal.datetime == dateVal)
assert(richColumnVal.column == columnVal)
}
}
开发者ID:danielpes,项目名称:spark-datetime-lite,代码行数:29,代码来源:ImplicitsSuite.scala
示例5: ColumnColumnConstraint
//设置package包名称以及导入依赖的类
package be.dataminded.wharlord.constraints
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.Try
case class ColumnColumnConstraint(constraintColumn: Column) extends Constraint {
val fun = (df: DataFrame) => {
val maybeFailingRows = Try {
val succeedingRows = df.filter(constraintColumn).count
df.count - succeedingRows
}
ColumnColumnConstraintResult(
constraint = this,
data = maybeFailingRows.toOption.map(ColumnColumnConstraintResultData),
status = tryToStatus[Long](maybeFailingRows, _ == 0)
)
}
}
case class ColumnColumnConstraintResult(constraint: ColumnColumnConstraint,
data: Option[ColumnColumnConstraintResultData],
status: ConstraintStatus)
extends ConstraintResult[ColumnColumnConstraint] {
val message: String = createColumnConstraintMessage(
status = status,
constraintResult = this,
constraintString = constraint.constraintColumn.toString,
maybeViolatingRows = data.map(_.failedRows)
)
}
case class ColumnColumnConstraintResultData(failedRows: Long)
开发者ID:datamindedbe,项目名称:wharlord,代码行数:38,代码来源:ColumnColumnConstraint.scala
示例6: AnyOfConstraint
//设置package包名称以及导入依赖的类
package be.dataminded.wharlord.constraints
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.Try
case class AnyOfConstraint(columnName: String, allowedValues: Set[Any]) extends Constraint {
val fun = (df: DataFrame) => {
val maybeError = Try(df.select(new Column(columnName))) // check if column is not ambiguous
val maybeColumnIndex = maybeError.map(_ => df.columns.indexOf(columnName))
val maybeNotAllowedCount = maybeColumnIndex.map(columnIndex => df.rdd.filter(row => !row.isNullAt(columnIndex) &&
!allowedValues.contains(row.get(columnIndex))).count)
AnyOfConstraintResult(
constraint = this,
data = maybeNotAllowedCount.toOption.map(AnyOfConstraintResultData),
status = tryToStatus[Long](maybeNotAllowedCount, _ == 0)
)
}
}
case class AnyOfConstraintResult(constraint: AnyOfConstraint,
data: Option[AnyOfConstraintResultData],
status: ConstraintStatus) extends ConstraintResult[AnyOfConstraint] {
val message: String = {
val allowed = constraint.allowedValues
val columnName = constraint.columnName
val maybeFailedRows = data.map(_.failedRows)
val maybePluralSAndVerb = maybeFailedRows.map(failedRows => if (failedRows == 1) ("", "is") else ("s", "are"))
(status, maybeFailedRows, maybePluralSAndVerb) match {
case (ConstraintSuccess, Some(0), Some((pluralS, verb))) =>
s"Column $columnName contains only values in $allowed."
case (ConstraintFailure, Some(failedRows), Some((pluralS, verb))) =>
s"Column $columnName contains $failedRows row$pluralS that $verb not in $allowed."
case (ConstraintError(throwable), None, None) =>
s"Checking whether column $columnName contains only values in $allowed failed: $throwable"
case _ => throw IllegalConstraintResultException(this)
}
}
}
case class AnyOfConstraintResultData(failedRows: Long)
开发者ID:datamindedbe,项目名称:wharlord,代码行数:44,代码来源:AnyOfConstraint.scala
示例7: UniqueKeyConstraint
//设置package包名称以及导入依赖的类
package be.dataminded.wharlord.constraints
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.Try
case class UniqueKeyConstraint(columnNames: Seq[String]) extends Constraint {
require(columnNames.nonEmpty)
val fun = (df: DataFrame) => {
val columns = columnNames.map(name => new Column(name))
val maybeNonUniqueRows = Try(df.groupBy(columns: _*).count.filter(new Column("count") > 1).count)
UniqueKeyConstraintResult(
constraint = this,
data = maybeNonUniqueRows.toOption.map(UniqueKeyConstraintResultData),
status = tryToStatus[Long](maybeNonUniqueRows, _ == 0)
)
}
}
case class UniqueKeyConstraintResult(constraint: UniqueKeyConstraint,
data: Option[UniqueKeyConstraintResultData],
status: ConstraintStatus) extends ConstraintResult[UniqueKeyConstraint] {
val message: String = {
val columnNames = constraint.columnNames
val columnsString = columnNames.mkString(", ")
val isPlural = columnNames.length > 1
val columnNoun = "Column" + (if (isPlural) "s" else "")
val columnVerb = if (isPlural) "are" else "is"
val maybeNumNonUniqueTuples = data.map(_.numNonUniqueTuples)
val maybePluralS = maybeNumNonUniqueTuples.map(numNonUniqueTuples => if (numNonUniqueTuples != 1) "s" else "")
(status, maybeNumNonUniqueTuples, maybePluralS) match {
case (ConstraintSuccess, Some(0), _) =>
s"$columnNoun $columnsString $columnVerb a key."
case (ConstraintFailure, Some(numNonUniqueTuples), Some(pluralS)) =>
s"$columnNoun $columnsString $columnVerb not a key ($numNonUniqueTuples non-unique tuple$pluralS)."
case (ConstraintError(throwable), None, None) =>
s"Checking whether ${columnNoun.toLowerCase()} $columnsString $columnVerb a key failed: $throwable"
case default => throw IllegalConstraintResultException(this)
}
}
}
case class UniqueKeyConstraintResultData(numNonUniqueTuples: Long)
开发者ID:datamindedbe,项目名称:wharlord,代码行数:49,代码来源:UniqueKeyConstraint.scala
示例8: FunctionalDependencyConstraint
//设置package包名称以及导入依赖的类
package be.dataminded.wharlord.constraints
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.Try
case class FunctionalDependencyConstraint(determinantSet: Seq[String], dependentSet: Seq[String]) extends Constraint {
require(determinantSet.nonEmpty, "determinantSet must not be empty")
require(dependentSet.nonEmpty, "dependentSet must not be empty")
val fun = (df: DataFrame) => {
val determinantColumns = determinantSet.map(columnName => new Column(columnName))
val dependentColumns = dependentSet.map(columnName => new Column(columnName))
val maybeRelevantSelection = Try(df.select(determinantColumns ++ dependentColumns: _*))
val maybeDeterminantValueCounts = maybeRelevantSelection.map(_.distinct.groupBy(determinantColumns: _*).count)
val maybeViolatingDeterminantValuesCount =
maybeDeterminantValueCounts.map(_.filter(new Column("count") =!= 1).count)
FunctionalDependencyConstraintResult(
constraint = this,
data = maybeViolatingDeterminantValuesCount.toOption.map(FunctionalDependencyConstraintResultData),
status = tryToStatus[Long](maybeViolatingDeterminantValuesCount, _ == 0)
)
}
}
case class FunctionalDependencyConstraintResult(constraint: FunctionalDependencyConstraint,
data: Option[FunctionalDependencyConstraintResultData],
status: ConstraintStatus)
extends ConstraintResult[FunctionalDependencyConstraint] {
val message: String = {
val maybeFailedRows = data.map(_.failedRows)
val maybeRowPluralS = maybeFailedRows.map(failedRows => if (failedRows == 1) "" else "s")
val dependentSet = constraint.dependentSet
val determinantString = s"${constraint.determinantSet.mkString(", ")}"
val dependentString = s"${dependentSet.mkString(", ")}"
val (columnPluralS, columnVerb) = if (dependentSet.size == 1) ("", "is") else ("s", "are")
(status, maybeFailedRows, maybeRowPluralS) match {
case (ConstraintSuccess, Some(0), _) =>
s"Column$columnPluralS $dependentString $columnVerb functionally dependent on $determinantString."
case (ConstraintFailure, Some(failedRows), Some(rowPluralS)) =>
s"Column$columnPluralS $dependentString $columnVerb not functionally dependent on " +
s"$determinantString ($failedRows violating determinant value$rowPluralS)."
case (ConstraintError(throwable), None, None) =>
s"Checking whether column$columnPluralS $dependentString $columnVerb functionally " +
s"dependent on $determinantString failed: $throwable"
case default => throw IllegalConstraintResultException(this)
}
}
}
case class FunctionalDependencyConstraintResultData(failedRows: Long)
开发者ID:datamindedbe,项目名称:wharlord,代码行数:56,代码来源:FunctionalDependencyConstraint.scala
示例9: NumberOfRowsConstraint
//设置package包名称以及导入依赖的类
package be.dataminded.wharlord.constraints
import org.apache.spark.sql.functions.count
import org.apache.spark.sql.{Column, DataFrame}
case class NumberOfRowsConstraint(expected: Column) extends Constraint {
val fun: (DataFrame) => NumberOfRowsConstraintResult = (df: DataFrame) => {
val countDf = df.agg(count(new Column("*")).as(NumberOfRowsConstraint.countKey))
val actual = countDf.collect().map(_.getLong(0)).apply(0)
val satisfied = countDf.select(expected).collect().map(_.getBoolean(0)).apply(0)
NumberOfRowsConstraintResult(
constraint = this,
actual = actual,
status = if (satisfied) ConstraintSuccess else ConstraintFailure
)
}
}
object NumberOfRowsConstraint {
private[constraints] val countKey: String = "count"
def apply(expected: Column => Column): NumberOfRowsConstraint = {
new NumberOfRowsConstraint(expected(new Column(countKey)))
}
}
case class NumberOfRowsConstraintResult(constraint: NumberOfRowsConstraint,
actual: Long,
status: ConstraintStatus) extends ConstraintResult[NumberOfRowsConstraint] {
val message: String = {
val expected = constraint.expected
status match {
case ConstraintSuccess => s"The number of rows satisfies $expected."
case ConstraintFailure => s"The actual number of rows $actual does not satisfy $expected."
case _ => throw IllegalConstraintResultException(this)
}
}
}
开发者ID:datamindedbe,项目名称:wharlord,代码行数:45,代码来源:NumberOfRowsConstraint.scala
示例10: RegexConstraint
//设置package包名称以及导入依赖的类
package be.dataminded.wharlord.constraints
import java.util.regex.Pattern
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.Try
case class RegexConstraint(columnName: String, regex: String) extends Constraint {
val fun = (df: DataFrame) => {
val pattern = Pattern.compile(regex)
val doesNotMatch = udf((column: String) => column != null && !pattern.matcher(column).find())
val maybeDoesNotMatchCount = Try(df.filter(doesNotMatch(new Column(columnName))).count)
RegexConstraintResult(
constraint = this,
data = maybeDoesNotMatchCount.toOption.map(RegexConstraintResultData),
status = tryToStatus[Long](maybeDoesNotMatchCount, _ == 0)
)
}
}
case class RegexConstraintResult(constraint: RegexConstraint,
data: Option[RegexConstraintResultData],
status: ConstraintStatus) extends ConstraintResult[RegexConstraint] {
val message: String = {
val columnName = constraint.columnName
val regex = constraint.regex
val maybeFailedRows = data.map(_.failedRows)
val maybePluralSAndVerb = maybeFailedRows.map(failedRows => if (failedRows == 1) ("", "does") else ("s", "do"))
(status, maybeFailedRows, maybePluralSAndVerb) match {
case (ConstraintSuccess, Some(0), _) =>
s"Column $columnName matches $regex"
case (ConstraintFailure, Some(failedRows), Some((pluralS, verb))) =>
s"Column $columnName contains $failedRows row$pluralS that $verb not match $regex"
case (ConstraintError(throwable), None, None) =>
s"Checking whether column $columnName matches $regex failed: $throwable"
case default => throw IllegalConstraintResultException(this)
}
}
}
case class RegexConstraintResultData(failedRows: Long)
开发者ID:datamindedbe,项目名称:wharlord,代码行数:48,代码来源:RegexConstraint.scala
示例11: AlwaysNullConstraint
//设置package包名称以及导入依赖的类
package be.dataminded.wharlord.constraints
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.Try
case class AlwaysNullConstraint(columnName: String) extends Constraint {
override val fun: (DataFrame) => AlwaysNullConstraintResult = (df: DataFrame) => {
val tryNotNullCount = Try(df.filter(new Column(columnName).isNotNull).count)
AlwaysNullConstraintResult(
constraint = this,
status = tryToStatus[Long](tryNotNullCount, _ == 0),
data = tryNotNullCount.toOption.map(AlwaysNullConstraintResultData)
)
}
}
case class AlwaysNullConstraintResult(constraint: AlwaysNullConstraint,
status: ConstraintStatus,
data: Option[AlwaysNullConstraintResultData])
extends ConstraintResult[AlwaysNullConstraint] {
val message: String = {
val columnName = constraint.columnName
val maybeNonNullRows = data.map(_.nonNullRows)
val maybePluralS = maybeNonNullRows.map(n => if (n == 1) "" else "s")
(status, maybeNonNullRows, maybePluralS) match {
case (ConstraintError(throwable), None, None) =>
s"Checking column $columnName for being always null failed: $throwable"
case (ConstraintSuccess, Some(0), Some(_)) =>
s"Column $columnName is always null."
case (ConstraintFailure, Some(nonNullRows), Some(pluralS)) =>
s"Column $columnName contains $nonNullRows non-null row$pluralS (should always be null)."
case _ => throw IllegalConstraintResultException(this)
}
}
}
case class AlwaysNullConstraintResultData(nonNullRows: Long)
开发者ID:datamindedbe,项目名称:wharlord,代码行数:43,代码来源:AlwaysNullConstraint.scala
示例12: NeverNullConstraint
//设置package包名称以及导入依赖的类
package be.dataminded.wharlord.constraints
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.Try
case class NeverNullConstraint(columnName: String) extends Constraint {
val fun: (DataFrame) => NeverNullConstraintResult = (df: DataFrame) => {
val tryNullCount = Try(df.filter(new Column(columnName).isNull).count)
NeverNullConstraintResult(
constraint = this,
data = tryNullCount.toOption.map(NeverNullConstraintResultData),
status = tryToStatus[Long](tryNullCount, _ == 0)
)
}
}
case class NeverNullConstraintResult(constraint: NeverNullConstraint,
data: Option[NeverNullConstraintResultData],
status: ConstraintStatus)
extends ConstraintResult[NeverNullConstraint] {
val message: String = {
val columnName = constraint.columnName
val maybeNullRows = data.map(_.nullRows)
val maybePluralS = maybeNullRows.map(nullRows => if (nullRows == 1) "" else "s")
val maybeVerb = maybeNullRows.map(nullRows => if (nullRows == 1) "is" else "are")
(status, maybeNullRows, maybePluralS, maybeVerb) match {
case (ConstraintSuccess, Some(0), Some(pluralS), Some(verb)) =>
s"Column $columnName is never null."
case (ConstraintFailure, Some(nullRows), Some(pluralS), Some(verb)) =>
s"Column $columnName contains $nullRows row$pluralS that $verb null (should never be null)."
case (ConstraintError(throwable), None, None, None) =>
s"Checking column $columnName for being never null failed: $throwable"
case _ => throw IllegalConstraintResultException(this)
}
}
}
case class NeverNullConstraintResultData(nullRows: Long)
开发者ID:datamindedbe,项目名称:wharlord,代码行数:41,代码来源:NeverNullConstraint.scala
示例13: ConditionalColumnConstraint
//设置package包名称以及导入依赖的类
package be.dataminded.wharlord.constraints
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.Try
case class ConditionalColumnConstraint(statement: Column, implication: Column) extends Constraint {
val fun = (df: DataFrame) => {
val maybeFailingRows = Try {
val succeedingRows = df.filter(!statement || implication).count
df.count - succeedingRows
}
ConditionalColumnConstraintResult(
constraint = this,
data = maybeFailingRows.toOption.map(ConditionalColumnConstraintResultData),
status = tryToStatus[Long](maybeFailingRows, _ == 0)
)
}
}
case class ConditionalColumnConstraintResult(constraint: ConditionalColumnConstraint,
data: Option[ConditionalColumnConstraintResultData],
status: ConstraintStatus) extends ConstraintResult[ConditionalColumnConstraint] {
val message: String = createColumnConstraintMessage(
status = status,
constraintResult = this,
constraintString = s"${constraint.statement} -> ${constraint.implication}",
maybeViolatingRows = data.map(_.failedRows)
)
}
case class ConditionalColumnConstraintResultData(failedRows: Long)
开发者ID:datamindedbe,项目名称:wharlord,代码行数:37,代码来源:ConditionalColumnConstraint.scala
示例14: DateFormatConstraint
//设置package包名称以及导入依赖的类
package be.dataminded.wharlord.constraints
import java.text.SimpleDateFormat
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.Try
case class DateFormatConstraint(columnName: String,
format: SimpleDateFormat) extends Constraint {
val fun = (df: DataFrame) => {
val cannotBeDate = udf((column: String) => column != null && Try(format.parse(column)).isFailure)
val maybeCannotBeDateCount = Try(df.filter(cannotBeDate(new Column(columnName))).count)
DateFormatConstraintResult(
this,
data = maybeCannotBeDateCount.toOption.map(DateFormatConstraintResultData),
status = tryToStatus[Long](maybeCannotBeDateCount, _ == 0)
)
}
}
case class DateFormatConstraintResult(constraint: DateFormatConstraint,
data: Option[DateFormatConstraintResultData],
status: ConstraintStatus) extends ConstraintResult[DateFormatConstraint] {
val message: String = {
val format = constraint.format.toPattern
val columnName = constraint.columnName
val maybeFailedRows = data.map(_.failedRows)
val maybePluralS = maybeFailedRows.map(failedRows => if (failedRows == 1) "" else "s")
val maybeVerb = maybeFailedRows.map(failedRows => if (failedRows == 1) "is" else "are")
(status, maybeFailedRows, maybePluralS, maybeVerb) match {
case (ConstraintSuccess, Some(0), _, _) =>
s"Column $columnName is formatted by $format."
case (ConstraintFailure, Some(failedRows), Some(pluralS), Some(verb)) =>
s"Column $columnName contains $failedRows row$pluralS that $verb not formatted by $format."
case (ConstraintError(throwable), None, None, None) =>
s"Checking whether column $columnName is formatted by $format failed: $throwable"
case default => throw IllegalConstraintResultException(this)
}
}
}
case class DateFormatConstraintResultData(failedRows: Long)
开发者ID:datamindedbe,项目名称:wharlord,代码行数:50,代码来源:DateFormatConstraint.scala
示例15: TypeConversionConstraint
//设置package包名称以及导入依赖的类
package be.dataminded.wharlord.constraints
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.Try
case class TypeConversionConstraint(columnName: String,
convertedType: DataType) extends Constraint {
val fun = (df: DataFrame) => {
val originalColumn = new Column(columnName)
val castedColumnName = columnName + "_casted"
val maybeCasted = Try(df.select(originalColumn, originalColumn.cast(convertedType).as(castedColumnName)))
val maybeFailedCastsAndOriginalType = maybeCasted.map(casted => {
val failedCastsCount = casted.filter(new Column(castedColumnName).isNull && originalColumn.isNotNull).count
val originalType = df.schema.find(_.name == columnName).get.dataType
(failedCastsCount, originalType)
})
TypeConversionConstraintResult(
constraint = this,
data = maybeFailedCastsAndOriginalType.toOption.map{ case (failedCastsCount, originalType) =>
TypeConversionConstraintResultData(
originalType = originalType,
failedRows = failedCastsCount
)
},
status = tryToStatus[Long](maybeFailedCastsAndOriginalType.map{
case (failedCastsCount, originalType) => failedCastsCount
}, _ == 0)
)
}
}
case class TypeConversionConstraintResult(constraint: TypeConversionConstraint,
data: Option[TypeConversionConstraintResultData],
status: ConstraintStatus) extends ConstraintResult[TypeConversionConstraint] {
val message: String = {
val convertedType = constraint.convertedType
val columnName = constraint.columnName
val maybePluralSVerb = data.map(data => if (data.failedRows == 1) ("", "is") else ("s", "are"))
(status, data, maybePluralSVerb) match {
case (ConstraintSuccess, Some(TypeConversionConstraintResultData(originalType, 0)), _) =>
s"Column $columnName can be converted from $originalType to $convertedType."
case (ConstraintFailure, Some(TypeConversionConstraintResultData(originalType, failedRows)), Some((pluralS, verb))) =>
s"Column $columnName cannot be converted from $originalType to $convertedType. " +
s"$failedRows row$pluralS could not be converted."
case (ConstraintError(throwable), None, None) =>
s"Checking whether column $columnName can be converted to $convertedType failed: $throwable"
case default => throw IllegalConstraintResultException(this)
}
}
}
case class TypeConversionConstraintResultData(originalType: DataType, failedRows: Long)
开发者ID:datamindedbe,项目名称:wharlord,代码行数:59,代码来源:TypeConversionConstraint.scala
示例16: Utils
//设置package包名称以及导入依赖的类
package org.alghimo.sparkPipelines
import java.nio.charset.StandardCharsets
import java.security.MessageDigest
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.expr
import scala.collection.SeqLike
object Utils {
def hashId(str: String): Long = {
// This is effectively the same implementation as in Guava's Hashing, but 'inlined'
// to avoid a dependency on Guava just for this. It creates a long from the first 8 bytes
// of the (16 byte) MD5 hash, with first byte as least-significant byte in the long.
val bytes = MessageDigest.getInstance("MD5").digest(str.getBytes(StandardCharsets.UTF_8))
(bytes(0) & 0xFFL) |
((bytes(1) & 0xFFL) << 8) |
((bytes(2) & 0xFFL) << 16) |
((bytes(3) & 0xFFL) << 24) |
((bytes(4) & 0xFFL) << 32) |
((bytes(5) & 0xFFL) << 40) |
((bytes(6) & 0xFFL) << 48) |
((bytes(7) & 0xFFL) << 56)
}
def columnInList[Repr](col: String, existingColumns: SeqLike[String, Repr]): Column = {
if (existingColumns.contains(col)) {
new Column(col)
} else {
expr("NULL").as(col.toString)
}
}
def withColor(str: String, color: String = Console.GREEN) = {
color + str + Console.WHITE
}
}
开发者ID:alghimo,项目名称:spark-pipelines,代码行数:41,代码来源:Utils.scala
示例17: dimensionTableName
//设置package包名称以及导入依赖的类
package org.alghimo.spark.dimensionalModelling
import org.apache.spark.sql.{Column, Dataset, Encoder, Encoders}
def dimensionTableName: String
def tmpDimensionTableName: String = {
val Array(dbName, tableName) = dimensionTableName.split('.')
s"${dbName}.tmp_${tableName}"
}
def maxPartitionsInDimensionTable: Int = 400
def dimensionTable(refresh: Boolean = false): Dataset[DIM] = {
if (refresh) {
spark.catalog.refreshTable(dimensionTableName)
}
spark.table(dimensionTableName).as[DIM]
}
def currentDimensions(refresh: Boolean = false): Dataset[DIM] = dimensionTable(refresh).filter(s"${isCurrentColumnName}")
def notCurrentDimensions(refresh: Boolean = false): Dataset[DIM] = dimensionTable(refresh).filter(s"NOT ${isCurrentColumnName}")
def save(ds: Dataset[DIM], useTempTable: Boolean = true): Dataset[DIM] = {
println("Saving dimensions..")
val toSave = if (useTempTable) {
ds
.coalesce(maxPartitionsInDimensionTable)
.write
.mode("overwrite")
.saveAsTable(tmpDimensionTableName)
spark.table(tmpDimensionTableName)
} else {
ds
}
toSave
.coalesce(maxPartitionsInDimensionTable)
.write
.mode("overwrite")
.saveAsTable(dimensionTableName)
if (useTempTable) {
spark.sql(s"DROP TABLE ${tmpDimensionTableName} PURGE")
}
dimensionTable(refresh = true)
}
}
开发者ID:alghimo,项目名称:spark-dimensional-modelling,代码行数:54,代码来源:DimensionTableProvider.scala
示例18: MeasStatistics
//设置package包名称以及导入依赖的类
package com.epidata.spark.ops
import com.epidata.lib.models.StatsSummary
import org.apache.spark.sql.{ SQLContext, Column, DataFrame }
import org.apache.spark.sql.functions._
class MeasStatistics(
val meas_names: List[String],
val method: String
) extends Transformation {
override def apply(dataFrame: DataFrame, sqlContext: SQLContext): DataFrame = {
method match {
case "standard" =>
dataFrame.registerTempTable("df_tmp")
val row = sqlContext.sql(s"SELECT min(ts) as min, max(ts) as max FROM df_tmp").first()
val startTime = row.getTimestamp(0)
val stopTime = row.getTimestamp(1)
val mapping: Map[String, Column => Column] = Map(
"min" -> min, "max" -> max, "mean" -> mean, "count" -> count, "std" -> stddev
)
val groupBy = Seq("customer", "customer_site", "collection", "dataset", "start_time", "stop_time", "key1", "key2", "key3")
val aggregate = Seq("meas_value")
val operations = Seq("min", "max", "mean", "count", "std")
val exprs = aggregate.flatMap(c => operations.map(f => mapping(f)(col(c)).alias(f)))
val describeUDF = udf((min: Float, max: Float, mean: Float, count: Long, std: Float) =>
StatsSummary(min, max, mean, count, std).toJson)
val df = dataFrame
.withColumn("start_time", lit(startTime))
.withColumn("stop_time", lit(stopTime))
.filter(dataFrame("key2").isin(meas_names: _*) || dataFrame("key3").isin(meas_names: _*) || dataFrame("key1").isin(meas_names: _*))
.groupBy(groupBy.map(col): _*)
.agg(exprs.head, exprs.tail: _*)
.withColumn("meas_summary_description", lit("descriptive statistics"))
.withColumn("meas_summary_name", lit("statistics"))
.withColumn("meas_summary_value", describeUDF(col("min"), col("max"), col("mean"), col("count"), col("std")))
.drop(operations: _*)
df
case _ => throw new Exception("Unsupported statistics method: " + method)
}
}
override def destination: String = "measurements_summary"
}
开发者ID:epidataio,项目名称:epidata-community,代码行数:52,代码来源:MeasStatistics.scala
示例19: ColumnExt
//设置package包名称以及导入依赖的类
package com.github.mrpowers.spark.daria.sql
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
object ColumnExt {
implicit class ColumnMethods(col: Column) {
def chain(colMethod: (Column => Column)): Column = {
colMethod(col)
}
def chainUDF(udfName: String, cols: Column*): Column = {
callUDF(udfName, col +: cols: _*)
}
def nullBetween(lowerCol: Column, upperCol: Column): Column = {
when(lowerCol.isNull && upperCol.isNull, false).otherwise(
when(col.isNull, false).otherwise(
when(lowerCol.isNull && upperCol.isNotNull && col.leq(upperCol), true).otherwise(
when(lowerCol.isNotNull && upperCol.isNull && col.geq(lowerCol), true).otherwise(
col.between(lowerCol, upperCol)
)
)
)
)
}
def isFalsy: Column = {
when(col.isNull || col === lit(false), true).otherwise(false)
}
def isTruthy: Column = {
!col.isFalsy
}
def isNullOrBlank: Column = {
col.isNull || trim(col) === ""
}
def isNotIn(list: Any*): Column = {
not(col.isin(list: _*))
}
}
}
开发者ID:MrPowers,项目名称:spark-daria,代码行数:49,代码来源:ColumnExt.scala
注:本文中的org.apache.spark.sql.Column类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论