• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala DataType类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.spark.sql.types.DataType的典型用法代码示例。如果您正苦于以下问题:Scala DataType类的具体用法?Scala DataType怎么用?Scala DataType使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了DataType类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: DateTimeColumn

//设置package包名称以及导入依赖的类
package me.danielpes.spark.datetime

import org.apache.spark.sql.Column
import org.apache.spark.sql.types.{DataType, DateType, TimestampType}
import org.apache.spark.sql.functions.{col, udf}

class DateTimeColumn(val col: Column, dataType: DataType = TimestampType) {

  def +(p: Period): Column = dataType match {
    case _: DateType => udf((d: java.sql.Date) => new RichDate(d) + p).apply(col)
    case _: TimestampType => udf((ts: java.sql.Timestamp) => new RichDate(ts) + p).apply(col)
  }

  def -(p: Period): Column = this.+(-p)

  override def toString: String = s"{column: ${col.toString}, type: ${dataType.toString}}"
}

object DateTimeColumn {

  def apply(col: Column, dataType: DataType = TimestampType) = new DateTimeColumn(col, dataType)
  def apply(col: Column, typeString: String) = new DateTimeColumn(col, typeFromString(typeString))
  def apply(cName: String) = new DateTimeColumn(col(cName), TimestampType)
  def apply(cName: String, dataType: DataType) = new DateTimeColumn(col(cName), dataType)
  def apply(cName: String, typeString: String) = new DateTimeColumn(col(cName), typeFromString(typeString))

  private def typeFromString(s: String): DataType = s match {
    case "date" => DateType
    case "timestamp" => TimestampType
  }
} 
开发者ID:danielpes,项目名称:spark-datetime-lite,代码行数:32,代码来源:DateTimeColumn.scala


示例2: MedicineProcess

//设置package包名称以及导入依赖的类
package cn.com.warlock.practice.ml

import java.io.BufferedReader
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}

import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.{ArrayType, DataType, StringType}

import scala.collection.mutable.Set

class MedicineProcess(override val uid: String, private val dict: String)
  extends UnaryTransformer[Seq[String], Seq[String], MedicineProcess] {

  def this(dict: String) = this(Identifiable.randomUID("med"), dict)

  // ?????????
  private val wordsSet = loadDict

  // ????
  private def loadDict: Set[String] = {
    val br: BufferedReader = Files.newBufferedReader(Paths.get(dict), StandardCharsets.UTF_8)
    val words = Set[String]()

    var count = 0

    while (br.ready()) {
      words += br.readLine()
      count += 1
    }

    println(s"load med words: $count")

    words
  }

  override protected def createTransformFunc: Seq[String] => Seq[String] = (words: Seq[String]) => {
    // ?? "???", arr ?????????, c ??????? word
    words.foldLeft(List[String]())((arr, c) => {
      val newC = wordsSet.contains(c) match {
        case true => List(c, "_MED_")
        case false => List(c)
      }
      arr ++ newC
    })
  }

  override protected def validateInputType(inputType: DataType): Unit = {
    require(inputType.isInstanceOf[ArrayType],
      s"The input column must be ArrayType, but got $inputType.")
  }

  override protected def outputDataType: DataType = new ArrayType(StringType, true)

  override def copy(extra: ParamMap): MedicineProcess = defaultCopy(extra)
} 
开发者ID:warlock-china,项目名称:spark-meepo,代码行数:59,代码来源:MedicineProcess.scala


示例3: convertSparkTypeToPigTypeCode

//设置package包名称以及导入依赖的类
package com.paypal.risk.madmen20.util

import org.apache.spark.sql.types
import org.apache.spark.sql.types.DataType


  def convertSparkTypeToPigTypeCode(typ: DataType): Int = {
    typ match {
      case _: types.NullType => 1
      case _: types.BooleanType => 5
      case _: types.ByteType => 6
      case _: types.IntegerType => 10
      case _: types.LongType => 15
      case _: types.FloatType => 20
      case _: types.DoubleType => 25
      case _: types.TimestampType => 30
      case _: types.BinaryType => 50
      case _: types.StringType => 55
      case _: types.DecimalType => 70
      case _: types.MapType => 100
      case _: types.StructType => 110
      case _: types.ShortType => 10
      case _ => 0 // Unknown type
    }
  }

} 
开发者ID:yanlzhang8936,项目名称:madmen20,代码行数:28,代码来源:PigUtils.scala


示例4: TypeConversionConstraint

//设置package包名称以及导入依赖的类
package be.dataminded.wharlord.constraints

import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{Column, DataFrame}

import scala.util.Try

case class TypeConversionConstraint(columnName: String,
                                    convertedType: DataType) extends Constraint {

  val fun = (df: DataFrame) => {
    val originalColumn = new Column(columnName)
    val castedColumnName = columnName + "_casted"
    val maybeCasted = Try(df.select(originalColumn, originalColumn.cast(convertedType).as(castedColumnName)))
    val maybeFailedCastsAndOriginalType = maybeCasted.map(casted => {
      val failedCastsCount = casted.filter(new Column(castedColumnName).isNull && originalColumn.isNotNull).count
      val originalType = df.schema.find(_.name == columnName).get.dataType
      (failedCastsCount, originalType)
    })
    TypeConversionConstraintResult(
      constraint = this,
      data = maybeFailedCastsAndOriginalType.toOption.map{ case (failedCastsCount, originalType) =>
        TypeConversionConstraintResultData(
          originalType = originalType,
          failedRows = failedCastsCount
        )
      },
      status = tryToStatus[Long](maybeFailedCastsAndOriginalType.map{
        case (failedCastsCount, originalType) => failedCastsCount
      }, _ == 0)
    )
  }

}

case class TypeConversionConstraintResult(constraint: TypeConversionConstraint,
                                          data: Option[TypeConversionConstraintResultData],
                                          status: ConstraintStatus) extends ConstraintResult[TypeConversionConstraint] {

  val message: String = {
    val convertedType = constraint.convertedType
    val columnName = constraint.columnName
    val maybePluralSVerb = data.map(data => if (data.failedRows == 1) ("", "is") else ("s", "are"))
    (status, data, maybePluralSVerb) match {
      case (ConstraintSuccess, Some(TypeConversionConstraintResultData(originalType, 0)), _) =>
        s"Column $columnName can be converted from $originalType to $convertedType."
      case (ConstraintFailure, Some(TypeConversionConstraintResultData(originalType, failedRows)), Some((pluralS, verb))) =>
        s"Column $columnName cannot be converted from $originalType to $convertedType. " +
        s"$failedRows row$pluralS could not be converted."
      case (ConstraintError(throwable), None, None) =>
        s"Checking whether column $columnName can be converted to $convertedType failed: $throwable"
      case default => throw IllegalConstraintResultException(this)
    }
  }

}

case class TypeConversionConstraintResultData(originalType: DataType, failedRows: Long) 
开发者ID:datamindedbe,项目名称:wharlord,代码行数:59,代码来源:TypeConversionConstraint.scala


示例5: DataframeCreationApp2

//设置package包名称以及导入依赖的类
package org.apress.prospark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions.desc
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.StructType
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext

object DataframeCreationApp2 {

  def main(args: Array[String]) {
    if (args.length != 5) {
      System.err.println(
        "Usage: CdrDataframeApp2 <appname> <batchInterval> <hostname> <port> <schemaPath>")
      System.exit(1)
    }
    val Seq(appName, batchInterval, hostname, port, schemaFile) = args.toSeq

    val conf = new SparkConf()
      .setAppName(appName)
      .setJars(SparkContext.jarOfClass(this.getClass).toSeq)

    val ssc = new StreamingContext(conf, Seconds(batchInterval.toInt))

    val sqlC = new SQLContext(ssc.sparkContext)

    val schemaJson = scala.io.Source.fromFile(schemaFile).mkString
    val schema = DataType.fromJson(schemaJson).asInstanceOf[StructType]

    val cdrStream = ssc.socketTextStream(hostname, port.toInt)
      .map(_.split("\\t", -1))
      .foreachRDD(rdd => {
        val cdrs = sqlC.createDataFrame(rdd.map(c => Row(c: _*)), schema)
        
        cdrs.groupBy("countryCode").count().orderBy(desc("count")).show(5)
      })

    ssc.start()
    ssc.awaitTermination()

  }
} 
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:47,代码来源:L8-4DataFrameCreationSchema.scala


示例6: FieldPoly

//设置package包名称以及导入依赖的类
package com.esri.gdb

import java.nio.ByteBuffer

import com.esri.core.geometry.MultiPath
import org.apache.spark.sql.types.{DataType, Metadata}

@deprecated("not used", "0.4")
abstract class FieldPoly(name: String,
                         dataType: DataType,
                         nullValueAllowed: Boolean,
                         xOrig: Double,
                         yOrig: Double,
                         xyScale: Double,
                         metadata: Metadata)
  extends FieldBytes(name, dataType, nullValueAllowed, metadata) {

  protected var dx = 0L
  protected var dy = 0L

  def addPath(byteBuffer: ByteBuffer, numCoordinates: Int, path: MultiPath) = {
    0 until numCoordinates foreach (n => {
      dx += byteBuffer.getVarInt
      dy += byteBuffer.getVarInt
      val x = dx / xyScale + xOrig
      val y = dy / xyScale + yOrig
      n match {
        case 0 => path.startPath(x, y)
        case _ => path.lineTo(x, y)
      }
    })
    path
  }
} 
开发者ID:mraad,项目名称:spark-gdb,代码行数:35,代码来源:FieldPoly.scala


示例7: FieldBytes

//设置package包名称以及导入依赖的类
package com.esri.gdb

import java.nio.ByteBuffer

import org.apache.spark.sql.types.{DataType, Metadata}


abstract class FieldBytes(name: String,
                          dataType: DataType,
                          nullValueAllowed: Boolean,
                          metadata: Metadata = Metadata.empty
                         )
  extends Field(name, dataType, nullValueAllowed, metadata) {

  protected var m_bytes = new Array[Byte](1024)

  def getByteBuffer(byteBuffer: ByteBuffer) = {
    val numBytes = fillVarBytes(byteBuffer)
    ByteBuffer.wrap(m_bytes, 0, numBytes)
  }

  def fillVarBytes(byteBuffer: ByteBuffer) = {
    val numBytes = byteBuffer.getVarUInt.toInt
    if (numBytes > m_bytes.length) {
      m_bytes = new Array[Byte](numBytes)
    }
    0 until numBytes foreach {
      m_bytes(_) = byteBuffer.get
    }
    numBytes
  }
} 
开发者ID:mraad,项目名称:spark-gdb,代码行数:33,代码来源:FieldBytes.scala


示例8: SparkSessionExt

//设置package包名称以及导入依赖的类
package com.github.mrpowers.spark.daria.sql

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataType, StructField, StructType}

object SparkSessionExt {

  implicit class SparkSessionMethods(spark: SparkSession) {

    private def asRows[U](values: List[U]): List[Row] = {
      values map {
        case x: Row => x.asInstanceOf[Row]
        case y: Product => Row(y.productIterator.toList: _*)
        case a => Row(a)
      }
    }

    private def asSchema[U](fields: List[U]): List[StructField] = {
      fields map {
        case x: StructField => x.asInstanceOf[StructField]
        case (name: String, dataType: DataType, nullable: Boolean) =>
          StructField(name, dataType, nullable)
      }
    }

    def createDF[U, T](rowData: List[U], fields: List[T]): DataFrame = {
      spark.createDataFrame(
        spark.sparkContext.parallelize(asRows(rowData)),
        StructType(asSchema(fields))
      )
    }

  }

} 
开发者ID:MrPowers,项目名称:spark-daria,代码行数:37,代码来源:SparkSessionExt.scala


示例9: DataFrameInfo

//设置package包名称以及导入依赖的类
package org.tensorframes

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{DataType, StructType}


class DataFrameInfo private (cs: Array[ColumnInformation]) extends Serializable {
  def cols: Seq[ColumnInformation] = cs

  def explain: String = {
    val els = cols.map { c =>
      c.stf.map { i =>
        s"${i.dataType.toString}${i.shape.toString}"
      } .getOrElse { "??" + DataFrameInfo.pprint(c.field.dataType) }
    }
    els.mkString("DataFrame[", ", ", "]")
  }

  
  def merged: StructType = {
    StructType(cs.map(_.merged))
  }

  override def toString = explain
}

object DataFrameInfo {
  def pprint(s: DataType) = s.toString

  def apply(d: Seq[ColumnInformation]): DataFrameInfo = new DataFrameInfo(d.toArray)

  def get(df: DataFrame): DataFrameInfo = {
    new DataFrameInfo(df.schema.map(ColumnInformation.apply).toArray)
  }
} 
开发者ID:databricks,项目名称:tensorframes,代码行数:36,代码来源:DataFrameInfo.scala


示例10: SqlShiftMySQLDialect

//设置package包名称以及导入依赖的类
package com.goibibo.sqlshift.commons


import java.sql.Types
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
import org.apache.spark.sql.types.{DataType, IntegerType, LongType, MetadataBuilder}


case object SqlShiftMySQLDialect extends JdbcDialect {

    override def canHandle(url: String): Boolean = url.startsWith("jdbc:mysql")

    override def getCatalystType(sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
        if (sqlType == Types.VARBINARY && typeName.equals("BIT") && size != 1) {
            // This could instead be a BinaryType if we'd rather return bit-vectors of up to 64 bits as
            // byte arrays instead of longs.
            md.putLong("binarylong", 1)
            Option(LongType)
        } else if (typeName.equals("TINYINT")) {
            Option(IntegerType)
        } else None
    }

    override def quoteIdentifier(colName: String): String = {
        s"`$colName`"
    }

    override def getTableExistsQuery(table: String): String = {
        s"SELECT 1 FROM $table LIMIT 1"
    }

    def registerDialect(): Unit = {
        
    }
} 
开发者ID:goibibo,项目名称:SqlShift,代码行数:36,代码来源:SqlShiftMySQLDialect.scala



注:本文中的org.apache.spark.sql.types.DataType类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala LoggingFilter类代码示例发布时间:2022-05-23
下一篇:
Scala NumberFormat类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap