本文整理汇总了Scala中org.apache.spark.sql.types.DataType类的典型用法代码示例。如果您正苦于以下问题:Scala DataType类的具体用法?Scala DataType怎么用?Scala DataType使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了DataType类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: DateTimeColumn
//设置package包名称以及导入依赖的类
package me.danielpes.spark.datetime
import org.apache.spark.sql.Column
import org.apache.spark.sql.types.{DataType, DateType, TimestampType}
import org.apache.spark.sql.functions.{col, udf}
class DateTimeColumn(val col: Column, dataType: DataType = TimestampType) {
def +(p: Period): Column = dataType match {
case _: DateType => udf((d: java.sql.Date) => new RichDate(d) + p).apply(col)
case _: TimestampType => udf((ts: java.sql.Timestamp) => new RichDate(ts) + p).apply(col)
}
def -(p: Period): Column = this.+(-p)
override def toString: String = s"{column: ${col.toString}, type: ${dataType.toString}}"
}
object DateTimeColumn {
def apply(col: Column, dataType: DataType = TimestampType) = new DateTimeColumn(col, dataType)
def apply(col: Column, typeString: String) = new DateTimeColumn(col, typeFromString(typeString))
def apply(cName: String) = new DateTimeColumn(col(cName), TimestampType)
def apply(cName: String, dataType: DataType) = new DateTimeColumn(col(cName), dataType)
def apply(cName: String, typeString: String) = new DateTimeColumn(col(cName), typeFromString(typeString))
private def typeFromString(s: String): DataType = s match {
case "date" => DateType
case "timestamp" => TimestampType
}
}
开发者ID:danielpes,项目名称:spark-datetime-lite,代码行数:32,代码来源:DateTimeColumn.scala
示例2: MedicineProcess
//设置package包名称以及导入依赖的类
package cn.com.warlock.practice.ml
import java.io.BufferedReader
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.{ArrayType, DataType, StringType}
import scala.collection.mutable.Set
class MedicineProcess(override val uid: String, private val dict: String)
extends UnaryTransformer[Seq[String], Seq[String], MedicineProcess] {
def this(dict: String) = this(Identifiable.randomUID("med"), dict)
// ?????????
private val wordsSet = loadDict
// ????
private def loadDict: Set[String] = {
val br: BufferedReader = Files.newBufferedReader(Paths.get(dict), StandardCharsets.UTF_8)
val words = Set[String]()
var count = 0
while (br.ready()) {
words += br.readLine()
count += 1
}
println(s"load med words: $count")
words
}
override protected def createTransformFunc: Seq[String] => Seq[String] = (words: Seq[String]) => {
// ?? "???", arr ?????????, c ??????? word
words.foldLeft(List[String]())((arr, c) => {
val newC = wordsSet.contains(c) match {
case true => List(c, "_MED_")
case false => List(c)
}
arr ++ newC
})
}
override protected def validateInputType(inputType: DataType): Unit = {
require(inputType.isInstanceOf[ArrayType],
s"The input column must be ArrayType, but got $inputType.")
}
override protected def outputDataType: DataType = new ArrayType(StringType, true)
override def copy(extra: ParamMap): MedicineProcess = defaultCopy(extra)
}
开发者ID:warlock-china,项目名称:spark-meepo,代码行数:59,代码来源:MedicineProcess.scala
示例3: convertSparkTypeToPigTypeCode
//设置package包名称以及导入依赖的类
package com.paypal.risk.madmen20.util
import org.apache.spark.sql.types
import org.apache.spark.sql.types.DataType
def convertSparkTypeToPigTypeCode(typ: DataType): Int = {
typ match {
case _: types.NullType => 1
case _: types.BooleanType => 5
case _: types.ByteType => 6
case _: types.IntegerType => 10
case _: types.LongType => 15
case _: types.FloatType => 20
case _: types.DoubleType => 25
case _: types.TimestampType => 30
case _: types.BinaryType => 50
case _: types.StringType => 55
case _: types.DecimalType => 70
case _: types.MapType => 100
case _: types.StructType => 110
case _: types.ShortType => 10
case _ => 0 // Unknown type
}
}
}
开发者ID:yanlzhang8936,项目名称:madmen20,代码行数:28,代码来源:PigUtils.scala
示例4: TypeConversionConstraint
//设置package包名称以及导入依赖的类
package be.dataminded.wharlord.constraints
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{Column, DataFrame}
import scala.util.Try
case class TypeConversionConstraint(columnName: String,
convertedType: DataType) extends Constraint {
val fun = (df: DataFrame) => {
val originalColumn = new Column(columnName)
val castedColumnName = columnName + "_casted"
val maybeCasted = Try(df.select(originalColumn, originalColumn.cast(convertedType).as(castedColumnName)))
val maybeFailedCastsAndOriginalType = maybeCasted.map(casted => {
val failedCastsCount = casted.filter(new Column(castedColumnName).isNull && originalColumn.isNotNull).count
val originalType = df.schema.find(_.name == columnName).get.dataType
(failedCastsCount, originalType)
})
TypeConversionConstraintResult(
constraint = this,
data = maybeFailedCastsAndOriginalType.toOption.map{ case (failedCastsCount, originalType) =>
TypeConversionConstraintResultData(
originalType = originalType,
failedRows = failedCastsCount
)
},
status = tryToStatus[Long](maybeFailedCastsAndOriginalType.map{
case (failedCastsCount, originalType) => failedCastsCount
}, _ == 0)
)
}
}
case class TypeConversionConstraintResult(constraint: TypeConversionConstraint,
data: Option[TypeConversionConstraintResultData],
status: ConstraintStatus) extends ConstraintResult[TypeConversionConstraint] {
val message: String = {
val convertedType = constraint.convertedType
val columnName = constraint.columnName
val maybePluralSVerb = data.map(data => if (data.failedRows == 1) ("", "is") else ("s", "are"))
(status, data, maybePluralSVerb) match {
case (ConstraintSuccess, Some(TypeConversionConstraintResultData(originalType, 0)), _) =>
s"Column $columnName can be converted from $originalType to $convertedType."
case (ConstraintFailure, Some(TypeConversionConstraintResultData(originalType, failedRows)), Some((pluralS, verb))) =>
s"Column $columnName cannot be converted from $originalType to $convertedType. " +
s"$failedRows row$pluralS could not be converted."
case (ConstraintError(throwable), None, None) =>
s"Checking whether column $columnName can be converted to $convertedType failed: $throwable"
case default => throw IllegalConstraintResultException(this)
}
}
}
case class TypeConversionConstraintResultData(originalType: DataType, failedRows: Long)
开发者ID:datamindedbe,项目名称:wharlord,代码行数:59,代码来源:TypeConversionConstraint.scala
示例5: DataframeCreationApp2
//设置package包名称以及导入依赖的类
package org.apress.prospark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions.desc
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.StructType
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
object DataframeCreationApp2 {
def main(args: Array[String]) {
if (args.length != 5) {
System.err.println(
"Usage: CdrDataframeApp2 <appname> <batchInterval> <hostname> <port> <schemaPath>")
System.exit(1)
}
val Seq(appName, batchInterval, hostname, port, schemaFile) = args.toSeq
val conf = new SparkConf()
.setAppName(appName)
.setJars(SparkContext.jarOfClass(this.getClass).toSeq)
val ssc = new StreamingContext(conf, Seconds(batchInterval.toInt))
val sqlC = new SQLContext(ssc.sparkContext)
val schemaJson = scala.io.Source.fromFile(schemaFile).mkString
val schema = DataType.fromJson(schemaJson).asInstanceOf[StructType]
val cdrStream = ssc.socketTextStream(hostname, port.toInt)
.map(_.split("\\t", -1))
.foreachRDD(rdd => {
val cdrs = sqlC.createDataFrame(rdd.map(c => Row(c: _*)), schema)
cdrs.groupBy("countryCode").count().orderBy(desc("count")).show(5)
})
ssc.start()
ssc.awaitTermination()
}
}
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:47,代码来源:L8-4DataFrameCreationSchema.scala
示例6: FieldPoly
//设置package包名称以及导入依赖的类
package com.esri.gdb
import java.nio.ByteBuffer
import com.esri.core.geometry.MultiPath
import org.apache.spark.sql.types.{DataType, Metadata}
@deprecated("not used", "0.4")
abstract class FieldPoly(name: String,
dataType: DataType,
nullValueAllowed: Boolean,
xOrig: Double,
yOrig: Double,
xyScale: Double,
metadata: Metadata)
extends FieldBytes(name, dataType, nullValueAllowed, metadata) {
protected var dx = 0L
protected var dy = 0L
def addPath(byteBuffer: ByteBuffer, numCoordinates: Int, path: MultiPath) = {
0 until numCoordinates foreach (n => {
dx += byteBuffer.getVarInt
dy += byteBuffer.getVarInt
val x = dx / xyScale + xOrig
val y = dy / xyScale + yOrig
n match {
case 0 => path.startPath(x, y)
case _ => path.lineTo(x, y)
}
})
path
}
}
开发者ID:mraad,项目名称:spark-gdb,代码行数:35,代码来源:FieldPoly.scala
示例7: FieldBytes
//设置package包名称以及导入依赖的类
package com.esri.gdb
import java.nio.ByteBuffer
import org.apache.spark.sql.types.{DataType, Metadata}
abstract class FieldBytes(name: String,
dataType: DataType,
nullValueAllowed: Boolean,
metadata: Metadata = Metadata.empty
)
extends Field(name, dataType, nullValueAllowed, metadata) {
protected var m_bytes = new Array[Byte](1024)
def getByteBuffer(byteBuffer: ByteBuffer) = {
val numBytes = fillVarBytes(byteBuffer)
ByteBuffer.wrap(m_bytes, 0, numBytes)
}
def fillVarBytes(byteBuffer: ByteBuffer) = {
val numBytes = byteBuffer.getVarUInt.toInt
if (numBytes > m_bytes.length) {
m_bytes = new Array[Byte](numBytes)
}
0 until numBytes foreach {
m_bytes(_) = byteBuffer.get
}
numBytes
}
}
开发者ID:mraad,项目名称:spark-gdb,代码行数:33,代码来源:FieldBytes.scala
示例8: SparkSessionExt
//设置package包名称以及导入依赖的类
package com.github.mrpowers.spark.daria.sql
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataType, StructField, StructType}
object SparkSessionExt {
implicit class SparkSessionMethods(spark: SparkSession) {
private def asRows[U](values: List[U]): List[Row] = {
values map {
case x: Row => x.asInstanceOf[Row]
case y: Product => Row(y.productIterator.toList: _*)
case a => Row(a)
}
}
private def asSchema[U](fields: List[U]): List[StructField] = {
fields map {
case x: StructField => x.asInstanceOf[StructField]
case (name: String, dataType: DataType, nullable: Boolean) =>
StructField(name, dataType, nullable)
}
}
def createDF[U, T](rowData: List[U], fields: List[T]): DataFrame = {
spark.createDataFrame(
spark.sparkContext.parallelize(asRows(rowData)),
StructType(asSchema(fields))
)
}
}
}
开发者ID:MrPowers,项目名称:spark-daria,代码行数:37,代码来源:SparkSessionExt.scala
示例9: DataFrameInfo
//设置package包名称以及导入依赖的类
package org.tensorframes
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{DataType, StructType}
class DataFrameInfo private (cs: Array[ColumnInformation]) extends Serializable {
def cols: Seq[ColumnInformation] = cs
def explain: String = {
val els = cols.map { c =>
c.stf.map { i =>
s"${i.dataType.toString}${i.shape.toString}"
} .getOrElse { "??" + DataFrameInfo.pprint(c.field.dataType) }
}
els.mkString("DataFrame[", ", ", "]")
}
def merged: StructType = {
StructType(cs.map(_.merged))
}
override def toString = explain
}
object DataFrameInfo {
def pprint(s: DataType) = s.toString
def apply(d: Seq[ColumnInformation]): DataFrameInfo = new DataFrameInfo(d.toArray)
def get(df: DataFrame): DataFrameInfo = {
new DataFrameInfo(df.schema.map(ColumnInformation.apply).toArray)
}
}
开发者ID:databricks,项目名称:tensorframes,代码行数:36,代码来源:DataFrameInfo.scala
示例10: SqlShiftMySQLDialect
//设置package包名称以及导入依赖的类
package com.goibibo.sqlshift.commons
import java.sql.Types
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
import org.apache.spark.sql.types.{DataType, IntegerType, LongType, MetadataBuilder}
case object SqlShiftMySQLDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:mysql")
override def getCatalystType(sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (sqlType == Types.VARBINARY && typeName.equals("BIT") && size != 1) {
// This could instead be a BinaryType if we'd rather return bit-vectors of up to 64 bits as
// byte arrays instead of longs.
md.putLong("binarylong", 1)
Option(LongType)
} else if (typeName.equals("TINYINT")) {
Option(IntegerType)
} else None
}
override def quoteIdentifier(colName: String): String = {
s"`$colName`"
}
override def getTableExistsQuery(table: String): String = {
s"SELECT 1 FROM $table LIMIT 1"
}
def registerDialect(): Unit = {
}
}
开发者ID:goibibo,项目名称:SqlShift,代码行数:36,代码来源:SqlShiftMySQLDialect.scala
注:本文中的org.apache.spark.sql.types.DataType类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论