本文整理汇总了Scala中org.apache.spark.sql.SQLContext类的典型用法代码示例。如果您正苦于以下问题:Scala SQLContext类的具体用法?Scala SQLContext怎么用?Scala SQLContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SQLContext类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: Person
//设置package包名称以及导入依赖的类
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
case class Person(name: String, age: Int)
object SimpleApp {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val info = List(("mike", 24), ("joe", 34), ("jack", 55))
val infoRDD = sc.parallelize(info)
val people = infoRDD.map(r => Person(r._1, r._2)).toDF()
people.registerTempTable("people")
val subDF = sqlContext.sql("select * from people where age > 30")
subDF.show()
}
}
开发者ID:mykumar,项目名称:SparkScalaInternalExperiements,代码行数:25,代码来源:SimpleApp.scala
示例2: SimpleApp
//设置package包名称以及导入依赖的类
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.ml.clustering.LDA
import org.apache.spark.mllib.linalg.{VectorUDT, Vectors}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{StructField, StructType}
object SimpleApp {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application").set("spark.ui.enabled", "false")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// Loads data
val rowRDD = sc.textFile("/tmp/lda_data.txt").filter(_.nonEmpty)
.map(_.split(" ").map(_.toDouble)).map(Vectors.dense).map(Row(_))
val schema = StructType(Array(StructField("name", new VectorUDT, false)))
val dataset = sqlContext.createDataFrame(rowRDD, schema)
dataset.show()
val lda = new LDA()
.setK(10)
.setMaxIter(10)
.setFeaturesCol("name")
val model = lda.fit(dataset)
val transformed = model.transform(dataset)
val ll = model.logLikelihood(dataset)
val lp = model.logPerplexity(dataset)
// describeTopics
val topics = model.describeTopics(3)
// Shows the result
topics.show(false)
transformed.show(false)
}
}
开发者ID:mykumar,项目名称:SparkScalaInternalExperiements,代码行数:41,代码来源:SimpleApp.scala
示例3: LRCV
//设置package包名称以及导入依赖的类
package com.ferhtaydn.rater
import org.apache.spark.SparkContext
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{ StringIndexerModel, VectorAssembler }
import org.apache.spark.ml.tuning.{ CrossValidator, CrossValidatorModel, ParamGridBuilder }
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.sql.{ DataFrame, Row, SQLContext }
class LRCV(sc: SparkContext) {
implicit val sqlContext = new SQLContext(sc)
val lr = new LogisticRegression().setMaxIter(10).setFeaturesCol("scaledFeatures")
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.build()
val assembler = new VectorAssembler()
.setInputCols(Array("gender", "age", "weight", "height", "indexedJob"))
.setOutputCol("features")
val pipeline = new Pipeline()
.setStages(Array(assembler, standardScaler("features"), lr))
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new BinaryClassificationEvaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(10)
def train(df: DataFrame): (StringIndexerModel, CrossValidatorModel, Matrix) = {
// need to index strings on all data to not missing the job fields.
// other alternative can be manually assign values for each job like gender.
val indexerModel = stringIndexer("job").fit(df)
val indexed = indexerModel.transform(df)
val splits = indexed.randomSplit(Array(0.8, 0.2))
val training = splits(0).cache()
val test = splits(1)
val cvModel = cv.fit(training)
val predictionAndLabels = cvModel
.transform(test)
.select("label", "prediction").map {
case Row(label: Double, prediction: Double) ?
(prediction, label)
}
printBinaryMetrics(predictionAndLabels)
(indexerModel, cvModel, confusionMatrix(predictionAndLabels))
}
}
开发者ID:ferhtaydn,项目名称:canceRater,代码行数:62,代码来源:LRCV.scala
示例4: TikaLanguageAggregationExample
//设置package包名称以及导入依赖的类
package com.jasonfeist.spark.tika.example
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object TikaLanguageAggregationExample {
def main(args: Array[String]) {
if (args.length == 0 || args(0) == null) {
return
}
val conf = new SparkConf().setAppName("Tika Language Aggregation Example")
val sc: SparkContext = new SparkContext(conf)
val sqlContext: SQLContext = new SQLContext(sc)
val df: DataFrame = sqlContext.read
.format("com.jasonfeist.spark.tika")
.load(args(0))
.groupBy("Language")
.count()
df.show
}
}
开发者ID:jasonfeist,项目名称:tika-spark-datasource,代码行数:24,代码来源:TikaLanguageAggregationExample.scala
示例5: RealEstateData
//设置package包名称以及导入依赖的类
package fr.grislain
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.{ SQLContext, DataFrame, Row }
import org.apache.spark.sql.types._
object RealEstateData {
println("Starting real_estate_price")
val conf = new SparkConf().setAppName("real_estate_price").setMaster("local")
val context = new SparkContext(conf)
val sqlContext = new SQLContext(context)
def dataFrame: DataFrame = {
val input = context.textFile("../data/insee_notaires.csv")
sqlContext.createDataFrame(input mapPartitions { _.drop(1) } map {
line =>
Row.fromSeq(line.split(",").view.zipWithIndex filter { e => e._2 > 0 } flatMap {
e =>
e match {
case (t, 1) => Seq(t.take(4).toInt, t.drop(5).toInt)
case (p, _) => Seq(p.toDouble)
}
})
},
StructType(StructField("year", IntegerType) ::
StructField("quarter", IntegerType) ::
StructField("75001", DoubleType) ::
StructField("75002", DoubleType) ::
StructField("75003", DoubleType) ::
StructField("75004", DoubleType) ::
StructField("75005", DoubleType) ::
StructField("75006", DoubleType) ::
StructField("75007", DoubleType) ::
StructField("75008", DoubleType) ::
StructField("75009", DoubleType) ::
StructField("75010", DoubleType) ::
StructField("75011", DoubleType) ::
StructField("75012", DoubleType) ::
StructField("75013", DoubleType) ::
StructField("75014", DoubleType) ::
StructField("75015", DoubleType) ::
StructField("75016", DoubleType) ::
StructField("75017", DoubleType) ::
StructField("75018", DoubleType) ::
StructField("75019", DoubleType) ::
StructField("75020", DoubleType) :: Nil))
}
}
开发者ID:ngrislain,项目名称:french_real_estate,代码行数:50,代码来源:RealEstateData.scala
示例6: StudyRDD
//设置package包名称以及导入依赖的类
package com.study.spark.datasource
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}
class StudyRDD(sqlContext: SQLContext, schema: StructType) extends RDD[Row](sqlContext.sparkContext, deps=Nil) {
@DeveloperApi
override def compute(split: Partition, context: TaskContext): Iterator[Row] = new StudyReader(context, schema, split)
// ??? ?? ????? 2?? ???? ??? ????.
// ? Executor? ???? ??? ????. ???? ???? 2? ??? ???, ??? ??? ? ?? Executor? ?? 2???.
override protected def getPartitions: Array[Partition] = {
val arr: Array[Partition] = new Array[Partition](2)
arr.update(0, new Partition() {
override def index: Int = 0
})
arr.update(1, new Partition() {
override def index: Int = 1
})
arr
}
}
开发者ID:hackpupu,项目名称:LML,代码行数:27,代码来源:StudyRDD.scala
示例7: SimpleApp
//设置package包名称以及导入依赖的类
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
object SimpleApp {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("jdbc").
option("url", "jdbc:mysql://statsdb02p-am-tor02:3306/aminno").
option("driver", "com.mysql.jdbc.Driver").
option("dbtable", "member").
option("user", System.getenv("MYSQL_USERNAME")).
option("password", System.getenv("MYSQL_PASSWORD")).
option("partitionColumn", "hp").
option("lowerBound", "0").
option("upperBound", "44000000").
option("numPartitions", "5").
load()
df.registerTempTable("achat")
val someRows = sqlContext.sql("select hp, count(distinct up) as cnt from achat group by hp order by cnt desc").head()
println("--------see here!------->" + someRows.mkString(" "))
}
}
开发者ID:mykumar,项目名称:SparkScalaInternalExperiements,代码行数:27,代码来源:SimpleApp.scala
示例8: ParquetS3Backup
//设置package包名称以及导入依赖的类
package com.unity.analytics.spark.utils.parquet
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{Logging, SparkConf, SparkContext}
object ParquetS3Backup extends Logging{
implicit val formats = org.json4s.DefaultFormats
def main(args: Array[String]): Unit = {
val config = new ParquetS3BackupConfiguration(args)
val sparkConf = new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sqlContext = new SQLContext(new SparkContext(sparkConf))
config.merge() match {
case true => merge(sqlContext, config.srcDir(), config.destDir(), config.numFiles())
case false => split(sqlContext, config.srcDir(), config.destDir(), config.numFiles())
}
}
// Reads, then merges Parquet files and writes to destDir
def merge(sqlContext: SQLContext, srcDir: String, destDir: String, destNumFiles: Int): Unit = {
logInfo(s"ParquetS3Backup merge - srcDir: $srcDir, destDir: $destDir, destNumFiles: $destNumFiles")
sqlContext.read.parquet(srcDir)
.coalesce(destNumFiles)
.write.mode(SaveMode.Overwrite).parquet(destDir)
}
// Reads, then splits Parquet files and writes to destDir
def split(sqlContext: SQLContext, srcDir: String, destDir: String, destNumFiles: Int): Unit = {
logInfo(s"ParquetS3Backup split - srcDir: $srcDir, destDir: $destDir, destNumFiles: $destNumFiles")
sqlContext.read.parquet(srcDir)
.repartition(destNumFiles)
.write.mode(SaveMode.Overwrite).parquet(destDir)
}
// Reads backupMetadata and does a Backup on each srcDir to destDir, to the set number of files
def backup(sqlContext: SQLContext, backupMetadata: BackupMetadata): Unit = {
backupMetadata.backupEntries.foreach(backupEntry => {
if (backupEntry.destNumFiles <= backupEntry.srcNumFiles) {
merge(sqlContext, backupEntry.srcDir, backupEntry.destDir, backupEntry.destNumFiles)
} else {
split(sqlContext, backupEntry.srcDir, backupEntry.destDir, backupEntry.destNumFiles)
}
})
}
// Reads backupMetadata and restores from destDir to the srcDir, bringing back the original number of files
def restore(sqlContext: SQLContext, backupMetadata: BackupMetadata): Unit = {
backupMetadata.backupEntries.foreach(backupEntry => {
if (backupEntry.srcNumFiles <= backupEntry.destNumFiles) {
merge(sqlContext, backupEntry.destDir, backupEntry.srcDir, backupEntry.srcNumFiles)
} else {
split(sqlContext, backupEntry.destDir, backupEntry.srcDir, backupEntry.srcNumFiles)
}
})
}
}
开发者ID:UnityTech,项目名称:parquet-s3-backup,代码行数:59,代码来源:ParquetS3Backup.scala
示例9: SimpleApp
//设置package包名称以及导入依赖的类
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SQLContext}
object SimpleApp {
val url = "jdbc:mysql://bigdata-master:3306/nlp"
val driver = "com.mysql.jdbc.Driver"
val user = System.getenv("MYSQL_USERNAME")
val pwd = System.getenv("MYSQL_PASSWORD")
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("jdbc").
option("url", url).
option("driver", driver).
option("dbtable", "msg").
option("user", user).
option("password", pwd).
load()
df.registerTempTable("t_msg")
val msgDF = sqlContext.sql("select message from t_msg")
msgDF.printSchema()
val cleaner = (msg: String) => {
msg.toLowerCase.split(" ").map((w: String) => w.replaceAll("[^a-zA-Z0-9]", "")).distinct
}
val wordDF = msgDF.explode("message", "word")((r: String) => cleaner(r))
wordDF.registerTempTable("words")
val wordCount = sqlContext.sql("select word, count(1) as cnt from words group by word order by cnt desc")
println(wordCount.count())
save(wordCount, "msg_word_count")
}
def save(dataFrame: DataFrame, table: String): Unit = {
val props = new java.util.Properties()
props.setProperty("user", user)
props.setProperty("password", pwd)
props.setProperty("driver", driver)
// create and save in table
dataFrame.write.jdbc(url, table, props)
}
}
开发者ID:mykumar,项目名称:SparkScalaInternalExperiements,代码行数:50,代码来源:SimpleApp.scala
示例10: SparkSQL
//设置package包名称以及导入依赖的类
package utils
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
object SparkSQL {
def simpleSparkSQLApp {
val driverHost = "localhost"
val conf = new SparkConf(false) // skip loading external settings
.setMaster("local[4]") // run locally with enough threads
.setAppName("firstSparkApp")
.set("spark.logConf", "true")
.set("spark.driver.host", s"$driverHost")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext._
val path = "ecommerce-event-data.json"
val returnType = sqlContext.read.json(path)
returnType.select("user_name").collect().foreach { x => println(x.getString(0)) }
returnType.printSchema();
}
}
开发者ID:listaction,项目名称:spark-test-1,代码行数:32,代码来源:SparkSQL.scala
示例11: BigDataMakerTest
//设置package包名称以及导入依赖的类
package com.drakeconsulting.big_data_maker
import org.scalatest.FunSuite
import com.holdenkarau.spark.testing.SharedSparkContext
import org.apache.spark.sql.SQLContext
class BigDataMakerTest extends FunSuite with SharedSparkContext {
test("first") {
val sqlContext = new SQLContext(sc)
val bd = new BigData(sqlContext, "/tmp/b", 5, 100)
bd.addColumn(new StringConstant("f1", "abc"))
bd.addColumn(new StringConstant("f2", "def"))
val df = bd._createDataFrame
df.show
assert(500 === df.count)
assert(2 === df.columns.length)
}
test("col names") {
val sqlContext = new SQLContext(sc)
val bd = new BigData(sqlContext, "/tmp/b", 5, 100)
bd.addColumn(new StringConstant("f1", "abc"))
bd.addColumn(new StringConstant("", "def"))
assert("f1" === bd.cols(0).name)
assert("f_1" === bd.cols(1).name)
}
}
开发者ID:dondrake,项目名称:BigDataMaker,代码行数:30,代码来源:TestBigDataMaker.scala
示例12: CSVUtils
//设置package包名称以及导入依赖的类
package edu.gatech.cse8803.ioutils
import com.databricks.spark.csv.CsvContext
import org.apache.spark.sql.{DataFrame, SQLContext}
object CSVUtils {
private val pattern = "(\\w+)(\\.csv)?$".r.unanchored
def loadCSVAsTable(sqlContext: SQLContext, path: String): DataFrame = {
loadCSVAsTable(sqlContext, path, inferTableNameFromPath(path))
}
def loadCSVAsTable(sqlContext: SQLContext, path: String, tableName: String): DataFrame = {
val data = sqlContext.csvFile(path)
data.registerTempTable(tableName)
data
}
def inferTableNameFromPath(path: String) = path match {
case pattern(filename, extension) => filename
case _ => path
}
}
开发者ID:powersj,项目名称:spark4achilles,代码行数:25,代码来源:CSVUtils.scala
示例13: MafExample
//设置package包名称以及导入依赖的类
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
object MafExample {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("MAF Example")
val sc = new SparkContext(conf)
val spark = SparkSession
.builder()
.getOrCreate()
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", "\t")
.option("delimiter", "\t")
.option("comment", "#")
.load("TCGA.ACC.mutect.abbe72a5-cb39-48e4-8df5-5fd2349f2bb2.somatic.maf")
df.createOrReplaceTempView("mutations")
val topTwenty = spark.sql("SELECT Hugo_Symbol, count(*) FROM mutations GROUP BY Hugo_symbol ORDER BY count(*) DESC LIMIT 20")
val topTwentyMissense = spark.sql("SELECT Hugo_Symbol, count(*) FROM mutations WHERE Variant_Classification='Missense_Mutation' GROUP BY Hugo_symbol ORDER BY count(*) DESC LIMIT 20")
val fat4 = spark.sql("SELECT Chromosome, Start_Position, End_Position, Strand, Variant_Classification, Variant_Type, Tumor_Sample_Barcode FROM mutations WHERE Hugo_Symbol='FAT4'")
topTwenty.coalesce(1).write.format("com.databricks.spark.csv").save("results/topTwenty")
topTwentyMissense.coalesce(1).write.format("com.databricks.spark.csv").save("results/topTwentyMissense")
fat4.coalesce(1).write.format("com.databricks.spark.csv").save("results/fat4")
}
}
开发者ID:allisonheath,项目名称:sparktoys,代码行数:40,代码来源:MafExample.scala
示例14: QueryExecution
//设置package包名称以及导入依赖的类
package edu.utah.cs.simba.execution
import edu.utah.cs.simba.SimbaContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, QueryExecution => SQLQueryExecution}
class QueryExecution(val simbaContext: SimbaContext, val simbaLogical: LogicalPlan)
extends SQLQueryExecution(simbaContext, simbaLogical) {
lazy val withIndexedData: LogicalPlan = {
assertAnalyzed()
simbaContext.indexManager.useIndexedData(withCachedData)
}
override lazy val optimizedPlan: LogicalPlan = {
simbaContext.simbaOptimizer.execute(simbaContext.getSQLOptimizer.execute(withIndexedData))
}
override lazy val sparkPlan: SparkPlan = {
SQLContext.setActive(sqlContext)
SimbaContext.setActive(simbaContext)
simbaContext.simbaPlanner.plan(optimizedPlan).next()
}
}
开发者ID:zdccc,项目名称:SimbaExpand,代码行数:26,代码来源:QueryExecution.scala
示例15: Titanic
//设置package包名称以及导入依赖的类
package fr.ippon.spark.ml
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.{functions, Column, DataFrame, SQLContext}
object Titanic {
// Fonction de récupération des données d'un fichier de Titanic dans un DataFrame
def dataframeFromTitanicFile(sqlc: SQLContext, file: String): DataFrame = sqlc.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load(file)
// Fonction de calcul de l'age moyen
def calcMeanAge(df: DataFrame, inputCol: String): Double = df
.agg(functions.avg(df(inputCol)))
.head
.getDouble(0)
// Fonction nous donnant l'age ou la moyenne des ages
def fillMissingAge(df: DataFrame, inputCol: String, outputCol: String, replacementValue: Double): DataFrame = {
val ageValue: (Any) => Double = age => age match {
case age: Double => age
case _ => replacementValue
}
df.withColumn(outputCol, functions.callUDF(ageValue, DoubleType, df(inputCol)))
}
}
开发者ID:ippontech,项目名称:spark-bbl-prez,代码行数:31,代码来源:Titanic.scala
示例16: SQLContextSingleton
//设置package包名称以及导入依赖的类
package org.iamShantanu101.spark.SentimentAnalyzer.utils
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
object SQLContextSingleton {
@transient
@volatile private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = SQLContext.getOrCreate(sparkContext)
}
}
}
instance
}
}
开发者ID:iamShantanu101,项目名称:Twitter-Sentiment-Analyzer-Apache-Spark-Mllib,代码行数:23,代码来源:SQLContextSingleton.scala
示例17: RatePredictor
//设置package包名称以及导入依赖的类
package com.ferhtaydn.rater
import akka.actor.ActorSystem
import com.ferhtaydn.models.PatientInfo
import org.apache.spark.ml.feature.StringIndexerModel
import org.apache.spark.ml.tuning.CrossValidatorModel
import org.apache.spark.mllib.linalg.{ Matrix, Vector }
import org.apache.spark.sql.{ Row, SQLContext }
import scala.concurrent.{ ExecutionContextExecutor, Future }
class RatePredictor(system: ActorSystem, sqlContext: SQLContext,
indexModel: StringIndexerModel, cvModel: CrossValidatorModel,
confusionMatrix: String) {
private val decimalFormatter = new java.text.DecimalFormat("##.##")
private val blockingDispatcher: ExecutionContextExecutor = system.dispatchers.lookup("ml.predictor.dispatcher")
def confusionMatrixString: Future[String] = {
Future {
confusionMatrix
}(blockingDispatcher)
}
def predict(patientInfo: PatientInfo): Future[Either[String, Double]] = {
Future {
val df = sqlContext.createDataFrame(Seq(patientInfo.toRecord))
val indexedJobDF = indexModel.transform(df)
val result = cvModel
.transform(indexedJobDF)
.select("prediction", "probability").map {
case Row(prediction: Double, probability: Vector) ?
(probability, prediction)
}
result.collect().headOption match {
case Some((prob, _)) ? Right(decimalFormatter.format(prob(1)).toDouble)
case None ? Left(s"No result can be predicted for the patient")
}
}(blockingDispatcher)
}
}
开发者ID:ferhtaydn,项目名称:canceRater,代码行数:48,代码来源:RatePredictor.scala
示例18: DefaultSource
//设置package包名称以及导入依赖的类
package com.jasonfeist.spark.tika
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.{BaseRelation, RelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types._
class DefaultSource
extends RelationProvider with SchemaRelationProvider {
def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = {
parameters.getOrElse("path", sys.error("No path specified."))
new TikaMetadataRelation(
parameters.get("path").get,
schema,
new MetadataExtractor(),
new FieldDataExtractor())(sqlContext)
}
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
val struct =
StructType(
StructField("detectedtype", StringType, true) ::
StructField("language", StringType, true) ::
StructField("filename", StringType, true) ::
StructField("author", StringType, true) ::
StructField("text", StringType, true) ::
StructField("creation-date", TimestampType, true) ::
StructField("title", StringType, true) ::
StructField("content-length", IntegerType, true) ::
StructField("last-modified", DateType, true) :: Nil
)
createRelation(sqlContext, parameters, struct)
}
}
开发者ID:jasonfeist,项目名称:tika-spark-datasource,代码行数:36,代码来源:DefaultSource.scala
示例19: TikaMetadataRelation
//设置package包名称以及导入依赖的类
package com.jasonfeist.spark.tika
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.{StructType}
import org.slf4j.LoggerFactory
class TikaMetadataRelation protected[tika] (path: String,
userSchema: StructType,
metadataExtractor: MetadataExtractor,
fieldDataExtractor: FieldDataExtractor)
(@transient val sqlContext: SQLContext)
extends BaseRelation with TableScan with Serializable {
val logger = LoggerFactory.getLogger(classOf[TikaMetadataRelation])
override def schema: StructType = this.userSchema
override def buildScan(): RDD[Row] = {
val rdd = sqlContext
.sparkContext.binaryFiles(path)
rdd.map(extractFunc(_))
}
def extractFunc(
file: (String, PortableDataStream)
) : Row =
{
val extractedData = metadataExtractor.extract(file)
val rowArray = new Array[Any](schema.fields.length)
var index = 0
while (index < schema.fields.length) {
val field = schema(index)
val fieldData = fieldDataExtractor.matchedField(field.name,
field.dataType, extractedData._1, file._1, extractedData._2,
extractedData._3)
rowArray(index) = fieldData
index = index + 1
}
Row.fromSeq(rowArray)
}
}
开发者ID:jasonfeist,项目名称:tika-spark-datasource,代码行数:47,代码来源:TikaMetadataRelation.scala
示例20: SimpleApp
//设置package包名称以及导入依赖的类
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
object SimpleApp {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "true") // Automatically infer data types
.load("/tmp/cars.csv")
df.show()
val selectedData = df.select("year", "model")
selectedData
.coalesce(1) // merge all partitions as one in case result is distributely stored on all nodes
.write
.format("com.databricks.spark.csv")
.option("header", "true")
.save("/tmp/cars_new.csv")
}
}
开发者ID:mykumar,项目名称:SparkScalaInternalExperiements,代码行数:29,代码来源:SimpleApp.scala
注:本文中的org.apache.spark.sql.SQLContext类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论