• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala BaseRelation类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.spark.sql.sources.BaseRelation的典型用法代码示例。如果您正苦于以下问题:Scala BaseRelation类的具体用法?Scala BaseRelation怎么用?Scala BaseRelation使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了BaseRelation类的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: DefaultSource

//设置package包名称以及导入依赖的类
package com.jasonfeist.spark.tika

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.{BaseRelation, RelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types._


class DefaultSource
  extends RelationProvider with SchemaRelationProvider  {

    def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = {
      parameters.getOrElse("path", sys.error("No path specified."))
      new TikaMetadataRelation(
        parameters.get("path").get,
        schema,
        new MetadataExtractor(),
        new FieldDataExtractor())(sqlContext)
    }

    override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
      val struct =
        StructType(
              StructField("detectedtype", StringType, true) ::
              StructField("language", StringType, true) ::
              StructField("filename", StringType, true) ::
              StructField("author", StringType, true)  ::
              StructField("text", StringType, true)  ::
              StructField("creation-date", TimestampType, true) ::
              StructField("title", StringType, true) ::
              StructField("content-length", IntegerType, true) ::
              StructField("last-modified", DateType, true) :: Nil
        )
      createRelation(sqlContext, parameters, struct)
    }
  } 
开发者ID:jasonfeist,项目名称:tika-spark-datasource,代码行数:36,代码来源:DefaultSource.scala


示例2: TikaMetadataRelation

//设置package包名称以及导入依赖的类
package com.jasonfeist.spark.tika

import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.{StructType}
import org.slf4j.LoggerFactory


class TikaMetadataRelation protected[tika] (path: String,
                                            userSchema: StructType,
                                            metadataExtractor: MetadataExtractor,
                                            fieldDataExtractor: FieldDataExtractor)
                          (@transient val sqlContext: SQLContext)
  extends BaseRelation with TableScan with Serializable {

  val logger = LoggerFactory.getLogger(classOf[TikaMetadataRelation])

  override def schema: StructType = this.userSchema

  override def buildScan(): RDD[Row] = {

    val rdd = sqlContext
      .sparkContext.binaryFiles(path)
    rdd.map(extractFunc(_))
  }

  def extractFunc(
                    file: (String, PortableDataStream)
                  ) : Row  =
  {
    val extractedData = metadataExtractor.extract(file)
    val rowArray = new Array[Any](schema.fields.length)
    var index = 0
    while (index < schema.fields.length) {
      val field = schema(index)
      val fieldData = fieldDataExtractor.matchedField(field.name,
        field.dataType, extractedData._1, file._1, extractedData._2,
        extractedData._3)
      rowArray(index) = fieldData
      index = index + 1
    }
    Row.fromSeq(rowArray)
  }
} 
开发者ID:jasonfeist,项目名称:tika-spark-datasource,代码行数:47,代码来源:TikaMetadataRelation.scala


示例3: StudyRelation

//设置package包名称以及导入依赖的类
package com.study.spark.datasource

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}


class StudyRelation(parameters: Map[String, String])(@transient val sqlContext: SQLContext)
  extends BaseRelation with TableScan {

  override def schema: StructType = {
    // ??? ?? ?????, ?? ??? ???? ????. ???? ?????? ???? ??????, ???? ?? ??? ????
    val fields: Array[StructField] = new Array[StructField](3)
    fields.update(0, new StructField("field1", StringType))
    fields.update(1, new StructField("field2", StringType))
    fields.update(2, new StructField("field2", StringType))
    new StructType(fields.asInstanceOf[Array[StructField]])
  }

  // RDD[Row]? ???? StudyRDD? ???.
  override def buildScan(): RDD[Row] = new StudyRDD(sqlContext, schema)
} 
开发者ID:hackpupu,项目名称:LML,代码行数:24,代码来源:StudyRelation.scala


示例4: sample

//设置package包名称以及导入依赖的类
package com.rishabh.spark.datasource.s3


import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}


case class sample(id: Integer)

class S3Relation(accesskey: String, secretkey: String, fileType: String, bucket: String, path:
String, write: Boolean)
                (@transient
                 val sqlContext: SQLContext) extends BaseRelation with TableScan {

  import sqlContext.implicits._

  val dummyData = Seq(sample(1))
  var df = sqlContext.sparkContext.parallelize(dummyData, 4).toDF()
  val s3Path = "s3a://" + bucket + path

  val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
  hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
  hadoopConf.set("fs.s3a.access.key", accesskey)
  hadoopConf.set("fs.s3a.secret.key", secretkey)

  override def schema: StructType = {
    fileType match {
      case "json" =>
        df = sqlContext.read.json(s3Path)
      case "csv" =>
        df = sqlContext.read.format("com.databricks.spark.csv").load(s3Path)
      case "parquet" =>
        df = sqlContext.read.parquet(s3Path)
    }
    df.schema
  }

  override def buildScan(): RDD[Row] = {
    df.rdd
  }
} 
开发者ID:rishabhbhardwaj,项目名称:spark-datasource-s3,代码行数:44,代码来源:S3Relation.scala


示例5: DefaultSource

//设置package包名称以及导入依赖的类
package com.rishabh.spark.datasource.s3

import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider}
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}

class DefaultSource extends RelationProvider with CreatableRelationProvider {

  override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]):
  BaseRelation = {
    val accessKey = parameters.getOrElse("accesskey", sys.error("accesskey is required"))
    val secretKey = parameters.getOrElse("secretkey", sys.error("secretkey is required"))
    val fileType = parameters.getOrElse("type", sys.error("filetype is required"))
    val path = parameters.getOrElse("path", sys.error("path is required"))
    val bucket = parameters.getOrElse("bucketName", sys.error("bucket is required"))
    new S3Relation(accessKey, secretKey, fileType, bucket, path, false)(sqlContext)
  }

  override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String,
    String], data: DataFrame): BaseRelation = {
    val accesskey = parameters.getOrElse("accesskey",sys.error("accesskey is required"))
    val secretkey = parameters.getOrElse("secretkey", sys.error("secretkey is required"))
    val bucket = parameters.getOrElse("bucketName", sys.error("bucket is required"))
    val fileType = parameters.getOrElse("type", sys.error("filetype is required"))
    val path = parameters.getOrElse("path", sys.error("path is required"))

    val supported = List("json", "parquet", "csv")
    if (!supported.contains(fileType)) {
      sys.error("fileType " + fileType + " not supported.")
    }
    val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
    hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    hadoopConf.set("fs.s3a.access.key", accesskey)
    hadoopConf.set("fs.s3a.secret.key", secretkey)
    val s3Path = "s3a://" + bucket + path
    doSave(fileType, data, s3Path)

    new S3Relation(accesskey, secretkey, fileType, bucket, path, true)(sqlContext)
  }

  private def doSave(fileType: String, dataFrame: DataFrame, path: String) = {
    fileType match {
      case "json" =>
        dataFrame.write.json(path)
      case "parquet" =>
        dataFrame.write.parquet(path)
      case "csv" =>
        dataFrame.write.format("com.databricks.spark.csv").save(path)
    }
  }
} 
开发者ID:rishabhbhardwaj,项目名称:spark-datasource-s3,代码行数:51,代码来源:DefaultSource.scala


示例6: NTriplesRelation

//设置package包名称以及导入依赖的类
package net.sansa_stack.inference.spark.data.loader.sql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.{StringType, StructField, StructType}

import net.sansa_stack.inference.utils.NTriplesStringToRDFTriple

class NTriplesRelation(location: String, userSchema: StructType)
                      (@transient val sqlContext: SQLContext)
    extends BaseRelation
      with TableScan
      with Serializable {
    override def schema: StructType = {
      if (this.userSchema != null) {
        this.userSchema
      }
      else {
        StructType(
          Seq(
            StructField("s", StringType, true),
            StructField("p", StringType, true),
            StructField("o", StringType, true)
        ))
      }
    }
    override def buildScan(): RDD[Row] = {
      val rdd = sqlContext
        .sparkContext
        .textFile(location)

      val converter = new NTriplesStringToRDFTriple()

      val rows = rdd.flatMap(x => converter.apply(x)).map(t => Row.fromSeq(Seq(t.s, t.p, t.o)))

      rows
    }
  } 
开发者ID:SANSA-Stack,项目名称:SANSA-Inference,代码行数:40,代码来源:NTriplesRelation.scala


示例7: DefaultSource

//设置package包名称以及导入依赖的类
package net.sansa_stack.inference.spark.data.loader.sql

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.{BaseRelation, RelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types.StructType


class DefaultSource extends RelationProvider with SchemaRelationProvider {
    override def createRelation(sqlContext: SQLContext, parameters: Map[String, String])
    : BaseRelation = {
      createRelation(sqlContext, parameters, null)
    }
    override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]
                                , schema: StructType)
    : BaseRelation = {
      parameters.getOrElse("path", sys.error("'path' must be specified for our data."))
      return new NTriplesRelation(parameters.get("path").get, schema)(sqlContext)
    }
  } 
开发者ID:SANSA-Stack,项目名称:SANSA-Inference,代码行数:20,代码来源:DefaultSource.scala


示例8: GDBRelation

//设置package包名称以及导入依赖的类
package com.esri.gdb

import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}


case class GDBRelation(gdbPath: String, gdbName: String, numPartition: Int)
                      (@transient val sqlContext: SQLContext)
  extends BaseRelation with Logging with TableScan {

  override val schema = inferSchema()

  private def inferSchema() = {
    val sc = sqlContext.sparkContext
    GDBTable.findTable(gdbPath, gdbName, sc.hadoopConfiguration) match {
      case Some(catTab) => {
        val table = GDBTable(gdbPath, catTab.hexName, sc.hadoopConfiguration)
        try {
          table.schema()
        } finally {
          table.close()
        }
      }
      case _ => {
        log.error(s"Cannot find '$gdbName' in $gdbPath, creating an empty schema !")
        StructType(Seq.empty[StructField])
      }
    }
  }

  override def buildScan(): RDD[Row] = {
    GDBRDD(sqlContext.sparkContext, gdbPath, gdbName, numPartition)
  }
} 
开发者ID:mraad,项目名称:spark-gdb,代码行数:38,代码来源:GDBRelation.scala


示例9: DefaultSource

//设置package包名称以及导入依赖的类
package com.github.traviscrawford.spark.dynamodb

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.sources.RelationProvider
import org.apache.spark.sql.sources.SchemaRelationProvider
import org.apache.spark.sql.types.StructType

private[dynamodb] class DefaultSource
  extends RelationProvider with SchemaRelationProvider {

  override def createRelation(
      sqlContext: SQLContext,
      parameters: Map[String, String])
    : BaseRelation = getDynamoDBRelation(sqlContext, parameters)

  override def createRelation(
      sqlContext: SQLContext,
      parameters: Map[String, String],
      schema: StructType)
    : BaseRelation = getDynamoDBRelation(sqlContext, parameters, Some(schema))

  private def getDynamoDBRelation(
      sqlContext: SQLContext,
      parameters: Map[String, String],
      maybeSchema: Option[StructType] = None)
    : DynamoDBRelation = {

    val tableName = parameters.getOrElse("table",
      throw new IllegalArgumentException("Required parameter 'table' was unspecified.")
    )

    DynamoDBRelation(
      tableName = tableName,
      maybeFilterExpression = parameters.get("filter_expression"),
      maybePageSize = parameters.get("page_size"),
      maybeRegion = parameters.get("region"),
      maybeSegments = parameters.get("segments"),
      maybeRateLimit = parameters.get("rate_limit_per_segment").map(Integer.parseInt),
      maybeSchema = maybeSchema,
      maybeCredentials = parameters.get("aws_credentials_provider"),
      maybeEndpoint = parameters.get("endpoint"))(sqlContext)
  }
} 
开发者ID:traviscrawford,项目名称:spark-dynamodb,代码行数:45,代码来源:DefaultSource.scala


示例10: DefaultSource

//设置package包名称以及导入依赖的类
package com.springml.spark.workday

import com.springml.spark.workday.model.{WWSInput, XPathInput}
import com.springml.spark.workday.util.CSVUtil
import org.apache.log4j.Logger
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}


class DefaultSource extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider {
  @transient val logger = Logger.getLogger(classOf[DefaultSource])

  override def createRelation(sqlContext: SQLContext,
                              parameters: Map[String, String]): BaseRelation = {
    createRelation(sqlContext, parameters, null)
  }

  override def createRelation(sqlContext: SQLContext,
                              parameters: Map[String, String],
                              schema: StructType): BaseRelation = {
    val username = param(parameters, "username")
    val password = param(parameters, "password")
    val wwsEndpoint = param(parameters, "wwsEndpoint")
    val objectTag = param(parameters, "objectTagPath")
    val detailsTag = param(parameters, "detailsTagPath")
    val request = param(parameters, "request")
    val xpath = param(parameters, "xpathMap")
    val namespacePrefix = parameters.get("namespacePrefixMap")

    val wwsInput = new WWSInput(username, password, wwsEndpoint, request)
    val xPathInput = new XPathInput(objectTag, detailsTag)
    CSVUtil.populateXPathInput(xpath, xPathInput)
    xPathInput.namespaceMap = CSVUtil.readCSV(namespacePrefix.get)
    logger.debug("Namespace Map" + xPathInput.namespaceMap)

    val records = new WWSReader(wwsInput, xPathInput) read()
    new DatasetRelation(records, sqlContext, schema)
  }

  override def createRelation(sqlContext: SQLContext,
                              mode: SaveMode,
                              parameters: Map[String, String],
                              data: DataFrame): BaseRelation = {
    logger.error("Save not supported by workday connector")
    throw new UnsupportedOperationException
  }

  private def param(parameters: Map[String, String],
                    paramName: String) : String = {
    val paramValue = parameters.getOrElse(paramName,
      sys.error(s"""'$paramName' must be specified for Spark Workday package"""))

    if ("password".equals(paramName)) {
      logger.debug("Param " + paramName + " value " + paramValue)
    }

    paramValue
  }

} 
开发者ID:springml,项目名称:spark-workday,代码行数:62,代码来源:DefaultSource.scala


示例11: DefaultSource

//设置package包名称以及导入依赖的类
package org.apache.spark.sql.sparkcv

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types.StructType
import org.bytedeco.javacpp.opencv_core.IplImage
import org.bytedeco.javacpp.opencv_imgcodecs.cvLoadImage

class DefaultSource
  extends RelationProvider
    with SchemaRelationProvider
    with CreatableRelationProvider
    with Logging {

  
  override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
    createRelation(sqlContext, parameters, new StructType())
  }

  override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = {
    assert(parameters.get("path").isDefined, "path parameter is required")
    val image: IplImage = cvLoadImage("src/main/resources/birds-of-paradise.jpg")
    ImageRelation(sqlContext, parameters, schema)
  }

  override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {
    ImageRelation(sqlContext, parameters, data.schema)
  }
} 
开发者ID:miguel0afd,项目名称:sparkCV,代码行数:31,代码来源:DefaultSource.scala


示例12: DefaultSource

//设置package包名称以及导入依赖的类
package solr

import com.lucidworks.spark.SolrRelation
import com.lucidworks.spark.util.Constants
import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.sources.{DataSourceRegister, BaseRelation, CreatableRelationProvider, RelationProvider}

class DefaultSource extends RelationProvider with CreatableRelationProvider with DataSourceRegister {

  override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
    try {
      return new SolrRelation(parameters, sqlContext)
    } catch {
      case re: RuntimeException => throw re
      case e: Exception => throw new RuntimeException(e)
    }
  }

  override def createRelation(
      sqlContext: SQLContext,
      mode: SaveMode,
      parameters: Map[String, String],
      df: DataFrame): BaseRelation = {
    try {
      // TODO: What to do with the saveMode?
      val solrRelation: SolrRelation = new SolrRelation(parameters, sqlContext, Some(df))
      solrRelation.insert(df, overwrite = true)
      solrRelation
    } catch {
      case re: RuntimeException => throw re
      case e: Exception => throw new RuntimeException(e)
    }
  }

  override def shortName(): String = Constants.SOLR_FORMAT
} 
开发者ID:OpenPOWER-BigData,项目名称:HDP2.5-spark-solr,代码行数:37,代码来源:DefaultSource.scala


示例13: DefaultSource

//设置package包名称以及导入依赖的类
package com.skapane.spark.xls

import org.apache.spark.sql.sources.RelationProvider
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.BaseRelation

class DefaultSource extends RelationProvider {

  override def createRelation(
    sqlContext: SQLContext,
    parameters: Map[String, String]): BaseRelation = {

    val path = parameters("path")

    val normalizeNames = parameters.get("normalizeNames") match {
      case Some(e) => e.toBoolean
      case _ => true
    }

    XlsRelation(
      path,
      parameters.get("sheet"),
      normalizeNames)(sqlContext)

  }
} 
开发者ID:skapane,项目名称:spark-xls,代码行数:27,代码来源:DefaultSource.scala


示例14: DatasetRelation

//设置package包名称以及导入依赖的类
package com.springml.spark.sftp

import com.databricks.spark.avro._
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.StructType


case class DatasetRelation(
    fileLocation: String,
    fileType: String,
    inferSchema: String,
    header: String,
    delimiter: String,
    customSchema: StructType,
    sqlContext: SQLContext) extends BaseRelation with TableScan {

    private val logger = Logger.getLogger(classOf[DatasetRelation])

    val df = read()

    private def read(): DataFrame = {
      var dataframeReader = sqlContext.read
      if (customSchema != null) {
        dataframeReader = dataframeReader.schema(customSchema)
      }

      var df: DataFrame = null
      if (fileType.equals("json")) {
        df = dataframeReader.json(fileLocation)
      } else if (fileType.equals("parquet")) {
        df = dataframeReader.parquet(fileLocation)
      } else if (fileType.equals("csv")) {
        df = dataframeReader.
          option("header", header).
          option("delimiter", delimiter).
          option("inferSchema", inferSchema).
          csv(fileLocation)
      } else if (fileType.equals("avro")) {
        df = dataframeReader.avro(fileLocation)
      }

      df
    }

    override def schema: StructType = {
      df.schema
    }

    override def buildScan(): RDD[Row] = {
      df.rdd
    }

} 
开发者ID:springml,项目名称:spark-sftp,代码行数:57,代码来源:DatasetRelation.scala



注:本文中的org.apache.spark.sql.sources.BaseRelation类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala ParamMap类代码示例发布时间:2022-05-23
下一篇:
Scala Cancellable类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap