本文整理汇总了Scala中org.apache.spark.sql.sources.BaseRelation类的典型用法代码示例。如果您正苦于以下问题:Scala BaseRelation类的具体用法?Scala BaseRelation怎么用?Scala BaseRelation使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了BaseRelation类的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: DefaultSource
//设置package包名称以及导入依赖的类
package com.jasonfeist.spark.tika
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.{BaseRelation, RelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types._
class DefaultSource
extends RelationProvider with SchemaRelationProvider {
def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = {
parameters.getOrElse("path", sys.error("No path specified."))
new TikaMetadataRelation(
parameters.get("path").get,
schema,
new MetadataExtractor(),
new FieldDataExtractor())(sqlContext)
}
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
val struct =
StructType(
StructField("detectedtype", StringType, true) ::
StructField("language", StringType, true) ::
StructField("filename", StringType, true) ::
StructField("author", StringType, true) ::
StructField("text", StringType, true) ::
StructField("creation-date", TimestampType, true) ::
StructField("title", StringType, true) ::
StructField("content-length", IntegerType, true) ::
StructField("last-modified", DateType, true) :: Nil
)
createRelation(sqlContext, parameters, struct)
}
}
开发者ID:jasonfeist,项目名称:tika-spark-datasource,代码行数:36,代码来源:DefaultSource.scala
示例2: TikaMetadataRelation
//设置package包名称以及导入依赖的类
package com.jasonfeist.spark.tika
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.{StructType}
import org.slf4j.LoggerFactory
class TikaMetadataRelation protected[tika] (path: String,
userSchema: StructType,
metadataExtractor: MetadataExtractor,
fieldDataExtractor: FieldDataExtractor)
(@transient val sqlContext: SQLContext)
extends BaseRelation with TableScan with Serializable {
val logger = LoggerFactory.getLogger(classOf[TikaMetadataRelation])
override def schema: StructType = this.userSchema
override def buildScan(): RDD[Row] = {
val rdd = sqlContext
.sparkContext.binaryFiles(path)
rdd.map(extractFunc(_))
}
def extractFunc(
file: (String, PortableDataStream)
) : Row =
{
val extractedData = metadataExtractor.extract(file)
val rowArray = new Array[Any](schema.fields.length)
var index = 0
while (index < schema.fields.length) {
val field = schema(index)
val fieldData = fieldDataExtractor.matchedField(field.name,
field.dataType, extractedData._1, file._1, extractedData._2,
extractedData._3)
rowArray(index) = fieldData
index = index + 1
}
Row.fromSeq(rowArray)
}
}
开发者ID:jasonfeist,项目名称:tika-spark-datasource,代码行数:47,代码来源:TikaMetadataRelation.scala
示例3: StudyRelation
//设置package包名称以及导入依赖的类
package com.study.spark.datasource
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
class StudyRelation(parameters: Map[String, String])(@transient val sqlContext: SQLContext)
extends BaseRelation with TableScan {
override def schema: StructType = {
// ??? ?? ?????, ?? ??? ???? ????. ???? ?????? ???? ??????, ???? ?? ??? ????
val fields: Array[StructField] = new Array[StructField](3)
fields.update(0, new StructField("field1", StringType))
fields.update(1, new StructField("field2", StringType))
fields.update(2, new StructField("field2", StringType))
new StructType(fields.asInstanceOf[Array[StructField]])
}
// RDD[Row]? ???? StudyRDD? ???.
override def buildScan(): RDD[Row] = new StudyRDD(sqlContext, schema)
}
开发者ID:hackpupu,项目名称:LML,代码行数:24,代码来源:StudyRelation.scala
示例4: sample
//设置package包名称以及导入依赖的类
package com.rishabh.spark.datasource.s3
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}
case class sample(id: Integer)
class S3Relation(accesskey: String, secretkey: String, fileType: String, bucket: String, path:
String, write: Boolean)
(@transient
val sqlContext: SQLContext) extends BaseRelation with TableScan {
import sqlContext.implicits._
val dummyData = Seq(sample(1))
var df = sqlContext.sparkContext.parallelize(dummyData, 4).toDF()
val s3Path = "s3a://" + bucket + path
val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoopConf.set("fs.s3a.access.key", accesskey)
hadoopConf.set("fs.s3a.secret.key", secretkey)
override def schema: StructType = {
fileType match {
case "json" =>
df = sqlContext.read.json(s3Path)
case "csv" =>
df = sqlContext.read.format("com.databricks.spark.csv").load(s3Path)
case "parquet" =>
df = sqlContext.read.parquet(s3Path)
}
df.schema
}
override def buildScan(): RDD[Row] = {
df.rdd
}
}
开发者ID:rishabhbhardwaj,项目名称:spark-datasource-s3,代码行数:44,代码来源:S3Relation.scala
示例5: DefaultSource
//设置package包名称以及导入依赖的类
package com.rishabh.spark.datasource.s3
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider}
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
class DefaultSource extends RelationProvider with CreatableRelationProvider {
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]):
BaseRelation = {
val accessKey = parameters.getOrElse("accesskey", sys.error("accesskey is required"))
val secretKey = parameters.getOrElse("secretkey", sys.error("secretkey is required"))
val fileType = parameters.getOrElse("type", sys.error("filetype is required"))
val path = parameters.getOrElse("path", sys.error("path is required"))
val bucket = parameters.getOrElse("bucketName", sys.error("bucket is required"))
new S3Relation(accessKey, secretKey, fileType, bucket, path, false)(sqlContext)
}
override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String,
String], data: DataFrame): BaseRelation = {
val accesskey = parameters.getOrElse("accesskey",sys.error("accesskey is required"))
val secretkey = parameters.getOrElse("secretkey", sys.error("secretkey is required"))
val bucket = parameters.getOrElse("bucketName", sys.error("bucket is required"))
val fileType = parameters.getOrElse("type", sys.error("filetype is required"))
val path = parameters.getOrElse("path", sys.error("path is required"))
val supported = List("json", "parquet", "csv")
if (!supported.contains(fileType)) {
sys.error("fileType " + fileType + " not supported.")
}
val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoopConf.set("fs.s3a.access.key", accesskey)
hadoopConf.set("fs.s3a.secret.key", secretkey)
val s3Path = "s3a://" + bucket + path
doSave(fileType, data, s3Path)
new S3Relation(accesskey, secretkey, fileType, bucket, path, true)(sqlContext)
}
private def doSave(fileType: String, dataFrame: DataFrame, path: String) = {
fileType match {
case "json" =>
dataFrame.write.json(path)
case "parquet" =>
dataFrame.write.parquet(path)
case "csv" =>
dataFrame.write.format("com.databricks.spark.csv").save(path)
}
}
}
开发者ID:rishabhbhardwaj,项目名称:spark-datasource-s3,代码行数:51,代码来源:DefaultSource.scala
示例6: NTriplesRelation
//设置package包名称以及导入依赖的类
package net.sansa_stack.inference.spark.data.loader.sql
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import net.sansa_stack.inference.utils.NTriplesStringToRDFTriple
class NTriplesRelation(location: String, userSchema: StructType)
(@transient val sqlContext: SQLContext)
extends BaseRelation
with TableScan
with Serializable {
override def schema: StructType = {
if (this.userSchema != null) {
this.userSchema
}
else {
StructType(
Seq(
StructField("s", StringType, true),
StructField("p", StringType, true),
StructField("o", StringType, true)
))
}
}
override def buildScan(): RDD[Row] = {
val rdd = sqlContext
.sparkContext
.textFile(location)
val converter = new NTriplesStringToRDFTriple()
val rows = rdd.flatMap(x => converter.apply(x)).map(t => Row.fromSeq(Seq(t.s, t.p, t.o)))
rows
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-Inference,代码行数:40,代码来源:NTriplesRelation.scala
示例7: DefaultSource
//设置package包名称以及导入依赖的类
package net.sansa_stack.inference.spark.data.loader.sql
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.{BaseRelation, RelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types.StructType
class DefaultSource extends RelationProvider with SchemaRelationProvider {
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String])
: BaseRelation = {
createRelation(sqlContext, parameters, null)
}
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]
, schema: StructType)
: BaseRelation = {
parameters.getOrElse("path", sys.error("'path' must be specified for our data."))
return new NTriplesRelation(parameters.get("path").get, schema)(sqlContext)
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-Inference,代码行数:20,代码来源:DefaultSource.scala
示例8: GDBRelation
//设置package包名称以及导入依赖的类
package com.esri.gdb
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
case class GDBRelation(gdbPath: String, gdbName: String, numPartition: Int)
(@transient val sqlContext: SQLContext)
extends BaseRelation with Logging with TableScan {
override val schema = inferSchema()
private def inferSchema() = {
val sc = sqlContext.sparkContext
GDBTable.findTable(gdbPath, gdbName, sc.hadoopConfiguration) match {
case Some(catTab) => {
val table = GDBTable(gdbPath, catTab.hexName, sc.hadoopConfiguration)
try {
table.schema()
} finally {
table.close()
}
}
case _ => {
log.error(s"Cannot find '$gdbName' in $gdbPath, creating an empty schema !")
StructType(Seq.empty[StructField])
}
}
}
override def buildScan(): RDD[Row] = {
GDBRDD(sqlContext.sparkContext, gdbPath, gdbName, numPartition)
}
}
开发者ID:mraad,项目名称:spark-gdb,代码行数:38,代码来源:GDBRelation.scala
示例9: DefaultSource
//设置package包名称以及导入依赖的类
package com.github.traviscrawford.spark.dynamodb
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.sources.RelationProvider
import org.apache.spark.sql.sources.SchemaRelationProvider
import org.apache.spark.sql.types.StructType
private[dynamodb] class DefaultSource
extends RelationProvider with SchemaRelationProvider {
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String])
: BaseRelation = getDynamoDBRelation(sqlContext, parameters)
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
schema: StructType)
: BaseRelation = getDynamoDBRelation(sqlContext, parameters, Some(schema))
private def getDynamoDBRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
maybeSchema: Option[StructType] = None)
: DynamoDBRelation = {
val tableName = parameters.getOrElse("table",
throw new IllegalArgumentException("Required parameter 'table' was unspecified.")
)
DynamoDBRelation(
tableName = tableName,
maybeFilterExpression = parameters.get("filter_expression"),
maybePageSize = parameters.get("page_size"),
maybeRegion = parameters.get("region"),
maybeSegments = parameters.get("segments"),
maybeRateLimit = parameters.get("rate_limit_per_segment").map(Integer.parseInt),
maybeSchema = maybeSchema,
maybeCredentials = parameters.get("aws_credentials_provider"),
maybeEndpoint = parameters.get("endpoint"))(sqlContext)
}
}
开发者ID:traviscrawford,项目名称:spark-dynamodb,代码行数:45,代码来源:DefaultSource.scala
示例10: DefaultSource
//设置package包名称以及导入依赖的类
package com.springml.spark.workday
import com.springml.spark.workday.model.{WWSInput, XPathInput}
import com.springml.spark.workday.util.CSVUtil
import org.apache.log4j.Logger
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
class DefaultSource extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider {
@transient val logger = Logger.getLogger(classOf[DefaultSource])
override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
createRelation(sqlContext, parameters, null)
}
override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String],
schema: StructType): BaseRelation = {
val username = param(parameters, "username")
val password = param(parameters, "password")
val wwsEndpoint = param(parameters, "wwsEndpoint")
val objectTag = param(parameters, "objectTagPath")
val detailsTag = param(parameters, "detailsTagPath")
val request = param(parameters, "request")
val xpath = param(parameters, "xpathMap")
val namespacePrefix = parameters.get("namespacePrefixMap")
val wwsInput = new WWSInput(username, password, wwsEndpoint, request)
val xPathInput = new XPathInput(objectTag, detailsTag)
CSVUtil.populateXPathInput(xpath, xPathInput)
xPathInput.namespaceMap = CSVUtil.readCSV(namespacePrefix.get)
logger.debug("Namespace Map" + xPathInput.namespaceMap)
val records = new WWSReader(wwsInput, xPathInput) read()
new DatasetRelation(records, sqlContext, schema)
}
override def createRelation(sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
logger.error("Save not supported by workday connector")
throw new UnsupportedOperationException
}
private def param(parameters: Map[String, String],
paramName: String) : String = {
val paramValue = parameters.getOrElse(paramName,
sys.error(s"""'$paramName' must be specified for Spark Workday package"""))
if ("password".equals(paramName)) {
logger.debug("Param " + paramName + " value " + paramValue)
}
paramValue
}
}
开发者ID:springml,项目名称:spark-workday,代码行数:62,代码来源:DefaultSource.scala
示例11: DefaultSource
//设置package包名称以及导入依赖的类
package org.apache.spark.sql.sparkcv
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types.StructType
import org.bytedeco.javacpp.opencv_core.IplImage
import org.bytedeco.javacpp.opencv_imgcodecs.cvLoadImage
class DefaultSource
extends RelationProvider
with SchemaRelationProvider
with CreatableRelationProvider
with Logging {
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
createRelation(sqlContext, parameters, new StructType())
}
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = {
assert(parameters.get("path").isDefined, "path parameter is required")
val image: IplImage = cvLoadImage("src/main/resources/birds-of-paradise.jpg")
ImageRelation(sqlContext, parameters, schema)
}
override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {
ImageRelation(sqlContext, parameters, data.schema)
}
}
开发者ID:miguel0afd,项目名称:sparkCV,代码行数:31,代码来源:DefaultSource.scala
示例12: DefaultSource
//设置package包名称以及导入依赖的类
package solr
import com.lucidworks.spark.SolrRelation
import com.lucidworks.spark.util.Constants
import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.sources.{DataSourceRegister, BaseRelation, CreatableRelationProvider, RelationProvider}
class DefaultSource extends RelationProvider with CreatableRelationProvider with DataSourceRegister {
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
try {
return new SolrRelation(parameters, sqlContext)
} catch {
case re: RuntimeException => throw re
case e: Exception => throw new RuntimeException(e)
}
}
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
df: DataFrame): BaseRelation = {
try {
// TODO: What to do with the saveMode?
val solrRelation: SolrRelation = new SolrRelation(parameters, sqlContext, Some(df))
solrRelation.insert(df, overwrite = true)
solrRelation
} catch {
case re: RuntimeException => throw re
case e: Exception => throw new RuntimeException(e)
}
}
override def shortName(): String = Constants.SOLR_FORMAT
}
开发者ID:OpenPOWER-BigData,项目名称:HDP2.5-spark-solr,代码行数:37,代码来源:DefaultSource.scala
示例13: DefaultSource
//设置package包名称以及导入依赖的类
package com.skapane.spark.xls
import org.apache.spark.sql.sources.RelationProvider
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.BaseRelation
class DefaultSource extends RelationProvider {
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val path = parameters("path")
val normalizeNames = parameters.get("normalizeNames") match {
case Some(e) => e.toBoolean
case _ => true
}
XlsRelation(
path,
parameters.get("sheet"),
normalizeNames)(sqlContext)
}
}
开发者ID:skapane,项目名称:spark-xls,代码行数:27,代码来源:DefaultSource.scala
示例14: DatasetRelation
//设置package包名称以及导入依赖的类
package com.springml.spark.sftp
import com.databricks.spark.avro._
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.StructType
case class DatasetRelation(
fileLocation: String,
fileType: String,
inferSchema: String,
header: String,
delimiter: String,
customSchema: StructType,
sqlContext: SQLContext) extends BaseRelation with TableScan {
private val logger = Logger.getLogger(classOf[DatasetRelation])
val df = read()
private def read(): DataFrame = {
var dataframeReader = sqlContext.read
if (customSchema != null) {
dataframeReader = dataframeReader.schema(customSchema)
}
var df: DataFrame = null
if (fileType.equals("json")) {
df = dataframeReader.json(fileLocation)
} else if (fileType.equals("parquet")) {
df = dataframeReader.parquet(fileLocation)
} else if (fileType.equals("csv")) {
df = dataframeReader.
option("header", header).
option("delimiter", delimiter).
option("inferSchema", inferSchema).
csv(fileLocation)
} else if (fileType.equals("avro")) {
df = dataframeReader.avro(fileLocation)
}
df
}
override def schema: StructType = {
df.schema
}
override def buildScan(): RDD[Row] = {
df.rdd
}
}
开发者ID:springml,项目名称:spark-sftp,代码行数:57,代码来源:DatasetRelation.scala
注:本文中的org.apache.spark.sql.sources.BaseRelation类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论