本文整理汇总了Scala中org.apache.spark.sql.types.StructType类的典型用法代码示例。如果您正苦于以下问题:Scala StructType类的具体用法?Scala StructType怎么用?Scala StructType使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了StructType类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: SimpleApp
//设置package包名称以及导入依赖的类
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.ml.clustering.LDA
import org.apache.spark.mllib.linalg.{VectorUDT, Vectors}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{StructField, StructType}
object SimpleApp {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application").set("spark.ui.enabled", "false")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// Loads data
val rowRDD = sc.textFile("/tmp/lda_data.txt").filter(_.nonEmpty)
.map(_.split(" ").map(_.toDouble)).map(Vectors.dense).map(Row(_))
val schema = StructType(Array(StructField("name", new VectorUDT, false)))
val dataset = sqlContext.createDataFrame(rowRDD, schema)
dataset.show()
val lda = new LDA()
.setK(10)
.setMaxIter(10)
.setFeaturesCol("name")
val model = lda.fit(dataset)
val transformed = model.transform(dataset)
val ll = model.logLikelihood(dataset)
val lp = model.logPerplexity(dataset)
// describeTopics
val topics = model.describeTopics(3)
// Shows the result
topics.show(false)
transformed.show(false)
}
}
开发者ID:mykumar,项目名称:SparkScalaInternalExperiements,代码行数:41,代码来源:SimpleApp.scala
示例2: StudyRDD
//设置package包名称以及导入依赖的类
package com.study.spark.datasource
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}
class StudyRDD(sqlContext: SQLContext, schema: StructType) extends RDD[Row](sqlContext.sparkContext, deps=Nil) {
@DeveloperApi
override def compute(split: Partition, context: TaskContext): Iterator[Row] = new StudyReader(context, schema, split)
// ??? ?? ????? 2?? ???? ??? ????.
// ? Executor? ???? ??? ????. ???? ???? 2? ??? ???, ??? ??? ? ?? Executor? ?? 2???.
override protected def getPartitions: Array[Partition] = {
val arr: Array[Partition] = new Array[Partition](2)
arr.update(0, new Partition() {
override def index: Int = 0
})
arr.update(1, new Partition() {
override def index: Int = 1
})
arr
}
}
开发者ID:hackpupu,项目名称:LML,代码行数:27,代码来源:StudyRDD.scala
示例3: SchemaTest
//设置package包名称以及导入依赖的类
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import com.gxq.learn.recontool.utils.SparkContextFactory
object SchemaTest {
def main(args: Array[String]): Unit = {
val (sc, ss) = SparkContextFactory.getSparkContext("local")
val data = sc.parallelize(Seq("Bern;10;12")) // mock for real data
val schema = new StructType()
.add("city", StringType, true)
.add("female", IntegerType, true)
.add("male", IntegerType, true)
val cities = data.map(line => {
val Array(city, female, male) = line.split(";")
Row(
city,
female.toInt,
male.toInt)
})
val citiesDF = ss.createDataFrame(cities, schema)
citiesDF.show
}
}
开发者ID:gong1989313,项目名称:spark-bigdata,代码行数:30,代码来源:SchemaTest.scala
示例4: DatabaseBackup
//设置package包名称以及导入依赖的类
package unus.stage
import unus.helpers.Conf
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.types.StructType
import scala.reflect.runtime.universe._
import org.apache.spark.sql.SaveMode
class DatabaseBackup[T: TypeTag](table: String) {
private lazy val schema = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType]
def save(): Unit = {
Conf.spark.read
.format("jdbc")
.option("url", Conf.dbUrl)
.option("dbtable", s""""$table"""")
.option("user", Conf.dbUsername)
.option("password", Conf.dbPassword)
.option("jdbcdriver","org.postgresql.Driver")
.load()
.write
.format("csv")
.option("header", "true")
.save(Conf.dataDir + "/" + table + ".csv")
}
def load(): Unit = {
Conf.spark.read
.format("csv")
.option("header", "true")
.schema(schema)
.load(Conf.dataDir + "/" + table + ".csv.gz")
.write
.format("jdbc")
.option("url", Conf.dbUrl)
.option("dbtable", s""""$table"""")
.option("user", Conf.dbUsername)
.option("password", Conf.dbPassword)
.option("jdbcdriver","org.postgresql.Driver")
.mode(SaveMode.Append)
.save()
}
}
开发者ID:mindfulmachines,项目名称:unus,代码行数:44,代码来源:DatabaseBackup.scala
示例5: User
//设置package包名称以及导入依赖的类
package services.users
import java.io.File
import java.nio.charset.Charset
import java.sql.Timestamp
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.streaming.StreamingContext
import services.events.EventStream
import services.Util
case class User(userId: String, testFinishTime: Timestamp, nickname: String, gender: String)
object User {
val DELIMITER = ','
val USER_FEED = "/Users/mahesh/data/affinitas/feeds/users/"
val USER_DATA = "/Users/mahesh/data/affinitas/tables/users/"
var ssc: StreamingContext = null
var sql: SparkSession = null
lazy val usersFeedDF = sql.read
.format("com.databricks.spark.csv")
.option("header", false)
.schema(StructType(Array(
StructField("userId", StringType, true),
StructField("nickname", StringType, true),
StructField("gender", StringType, true)
)
)).load(User.USER_FEED)
//EventStream.testFinishStream.print()
lazy val usersMap = usersFeedDF.rdd.map(record => (record.getString(0), (record.getString(1), record.getString(2))))
def initialize(sscO: StreamingContext, sqlO: SparkSession) = {
ssc = sscO
sql = sqlO
new File(USER_FEED).mkdirs()
new File(USER_DATA).mkdirs()
EventStream.testFinishStream.foreachRDD( {
rdd => {
val testFinishMap = rdd.map(record => (record.userId, record.timestamp))
val userData = testFinishMap.join(usersMap)
.map(record => Array(record._1, record._2._1, record._2._2._1, record._2._2._2))
.collect()
Util.writeCsvToDir(userData, DELIMITER.toString, USER_DATA)
}
})
}
}
开发者ID:f13mash,项目名称:spark_log_contact,代码行数:58,代码来源:User.scala
示例6: TikaMetadataRelation
//设置package包名称以及导入依赖的类
package com.jasonfeist.spark.tika
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.{StructType}
import org.slf4j.LoggerFactory
class TikaMetadataRelation protected[tika] (path: String,
userSchema: StructType,
metadataExtractor: MetadataExtractor,
fieldDataExtractor: FieldDataExtractor)
(@transient val sqlContext: SQLContext)
extends BaseRelation with TableScan with Serializable {
val logger = LoggerFactory.getLogger(classOf[TikaMetadataRelation])
override def schema: StructType = this.userSchema
override def buildScan(): RDD[Row] = {
val rdd = sqlContext
.sparkContext.binaryFiles(path)
rdd.map(extractFunc(_))
}
def extractFunc(
file: (String, PortableDataStream)
) : Row =
{
val extractedData = metadataExtractor.extract(file)
val rowArray = new Array[Any](schema.fields.length)
var index = 0
while (index < schema.fields.length) {
val field = schema(index)
val fieldData = fieldDataExtractor.matchedField(field.name,
field.dataType, extractedData._1, file._1, extractedData._2,
extractedData._3)
rowArray(index) = fieldData
index = index + 1
}
Row.fromSeq(rowArray)
}
}
开发者ID:jasonfeist,项目名称:tika-spark-datasource,代码行数:47,代码来源:TikaMetadataRelation.scala
示例7: StudyRelation
//设置package包名称以及导入依赖的类
package com.study.spark.datasource
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
class StudyRelation(parameters: Map[String, String])(@transient val sqlContext: SQLContext)
extends BaseRelation with TableScan {
override def schema: StructType = {
// ??? ?? ?????, ?? ??? ???? ????. ???? ?????? ???? ??????, ???? ?? ??? ????
val fields: Array[StructField] = new Array[StructField](3)
fields.update(0, new StructField("field1", StringType))
fields.update(1, new StructField("field2", StringType))
fields.update(2, new StructField("field2", StringType))
new StructType(fields.asInstanceOf[Array[StructField]])
}
// RDD[Row]? ???? StudyRDD? ???.
override def buildScan(): RDD[Row] = new StudyRDD(sqlContext, schema)
}
开发者ID:hackpupu,项目名称:LML,代码行数:24,代码来源:StudyRelation.scala
示例8: StudyReader
//设置package包名称以及导入依赖的类
package com.study.spark.datasource
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
class StudyReader(context: TaskContext, schema: StructType, split: Partition) extends Iterator[Row] {
private[this] var counter: Int = 0
// Task? ???? ???? close? ????? ??.
if(context != null) {
context.addTaskCompletionListener(context => close())
}
// 100?? Row? ??? ??
override def hasNext: Boolean = {
if(counter < 100) {
true
} else {
false
}
}
// 1?? Row? ????.
override def next(): Row = {
if(!hasNext) {
throw new NoSuchElementException("End of stream")
}
counter += 1
Row(split.index + " field1 " + counter, "field2 " + counter, "field3: " + counter)
}
// close?? ? ??? ??? ??? close??.
def close() = println("closed")
}
开发者ID:hackpupu,项目名称:LML,代码行数:37,代码来源:StudyReader.scala
示例9: PatientCacher
//设置package包名称以及导入依赖的类
package unus.stage
import unus.helpers.Conf
import unus.db.{Patient, Repository}
import unus.helpers.Cacher
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.types.StructType
class PatientCacher extends Cacher[Patient] {
override protected val name: String = "patient"
override protected def build(): RDD[Patient] = {
val schema = ScalaReflection.schemaFor[Patient].dataType.asInstanceOf[StructType]
import Conf.spark.implicits._
Conf.spark.read.format("csv")
.schema(schema)
.option("header", "true")
.load(Conf.dataDir + "/FInalDataset.csv")
.as[Patient]
.rdd
}
}
开发者ID:mindfulmachines,项目名称:unus,代码行数:26,代码来源:PatientCacher.scala
示例10: sample
//设置package包名称以及导入依赖的类
package com.rishabh.spark.datasource.s3
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}
case class sample(id: Integer)
class S3Relation(accesskey: String, secretkey: String, fileType: String, bucket: String, path:
String, write: Boolean)
(@transient
val sqlContext: SQLContext) extends BaseRelation with TableScan {
import sqlContext.implicits._
val dummyData = Seq(sample(1))
var df = sqlContext.sparkContext.parallelize(dummyData, 4).toDF()
val s3Path = "s3a://" + bucket + path
val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoopConf.set("fs.s3a.access.key", accesskey)
hadoopConf.set("fs.s3a.secret.key", secretkey)
override def schema: StructType = {
fileType match {
case "json" =>
df = sqlContext.read.json(s3Path)
case "csv" =>
df = sqlContext.read.format("com.databricks.spark.csv").load(s3Path)
case "parquet" =>
df = sqlContext.read.parquet(s3Path)
}
df.schema
}
override def buildScan(): RDD[Row] = {
df.rdd
}
}
开发者ID:rishabhbhardwaj,项目名称:spark-datasource-s3,代码行数:44,代码来源:S3Relation.scala
示例11: ScorePredictor
//设置package包名称以及导入依赖的类
package org.wikimedia.research.recommendation.job.translation
import java.io.File
import org.apache.log4j.{LogManager, Logger}
import org.apache.spark.ml.regression.RandomForestRegressionModel
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import scala.collection.parallel.mutable.ParArray
object ScorePredictor {
val log: Logger = LogManager.getLogger(ScorePredictor.getClass)
def predictScores(spark: SparkSession,
modelsInputDir: File,
predictionsOutputDir: Option[File],
sites: ParArray[String],
featureData: DataFrame): Unit = {
log.info("Scoring items")
val predictions: Array[DataFrame] = sites.map(target => {
try {
log.info("Scoring for " + target)
log.info("Getting work data for " + target)
val workData: DataFrame = Utils.getWorkData(spark, featureData, target, exists = false)
log.info("Loading model for " + target)
val model = RandomForestRegressionModel.load(
new File(modelsInputDir, target).getAbsolutePath)
log.info("Scoring data for " + target)
val predictions = model
.setPredictionCol(target)
.transform(workData)
.select("id", target)
predictions
} catch {
case unknown: Throwable =>
log.error("Score for " + target + " failed", unknown)
val schema = StructType(Seq(
StructField("id", StringType, nullable = false),
StructField(target, DoubleType, nullable = true)))
spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)
}
}).toArray
val predictedScores = predictions.reduce((left, right) => left.join(right, Seq("id"), "outer"))
log.info("Saving predictions")
predictionsOutputDir.foreach(f = o =>
predictedScores.coalesce(1)
.write
.mode(SaveMode.ErrorIfExists)
.option("header", value = true)
.option("compression", "bzip2")
.csv(new File(o, "allPredictions").getAbsolutePath))
}
}
开发者ID:schana,项目名称:recommendation-translation,代码行数:59,代码来源:ScorePredictor.scala
示例12: Dictionary
//设置package包名称以及导入依赖的类
package com.github.uosdmlab.nkp
import org.apache.spark.sql.{SparkSession, Row}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
object Dictionary {
import org.bitbucket.eunjeon.seunjeon.{Analyzer => EunjeonAnalyzer}
private var words = Seq.empty[String]
private def chain(fn: => Any): this.type = {
fn
this
}
private def syncDictionary(): Unit = EunjeonAnalyzer.setUserDict(words.toIterator)
def addWords(word: String, words: String*): this.type = addWords(word +: words)
def addWords(words: Traversable[String]): this.type = chain {
this.words = this.words ++ words
syncDictionary()
}
def reset(): this.type = chain {
EunjeonAnalyzer.resetUserDict()
words = Seq.empty[String]
}
def addWordsFromCSV(path: String, paths: String*): this.type =
addWordsFromCSV(path +: paths)
def addWordsFromCSV(paths: Traversable[String]): this.type = chain {
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val schema = StructType(Array(
StructField("word", StringType, nullable = false),
StructField("cost", StringType, nullable = true)))
val df = spark.read
.option("sep", ",")
.option("inferSchema", value = false)
.option("header", value = false)
.schema(schema)
.csv(paths.toSeq: _*)
val words = df.map {
case Row(word: String, cost: String) =>
s"$word,$cost"
case Row(word: String, null) =>
word
}.collect()
addWords(words)
}
}
开发者ID:uosdmlab,项目名称:spark-nkp,代码行数:62,代码来源:Dictionary.scala
示例13: EmptyRDFGraphDataFrame
//设置package包名称以及导入依赖的类
package net.sansa_stack.ml.spark.mining.amieSpark
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
/**
* @author Lorenz Buehmann
*/
object EmptyRDFGraphDataFrame {
def get(sqlContext: SQLContext): DataFrame = {
// convert RDD to DataFrame
val schemaString = "subject predicate object"
// generate the schema based on the string of schema
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// convert triples RDD to rows
val rowRDD = sqlContext.sparkContext.emptyRDD[Row]
// apply the schema to the RDD
val triplesDataFrame = sqlContext.createDataFrame(rowRDD, schema)
// register the DataFrame as a table
triplesDataFrame.createOrReplaceTempView("TRIPLES")
triplesDataFrame
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-ML,代码行数:30,代码来源:EmptyRDFGraphDataFrame.scala
示例14: DefaultSource
//设置package包名称以及导入依赖的类
package com.springml.spark.zuora
import com.springml.spark.zuora.model.ZuoraInput
import org.apache.log4j.Logger
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types.StructType
import scala.collection.mutable
class DefaultSource extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider {
@transient val logger = Logger.getLogger(classOf[DefaultSource])
override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
createRelation(sqlContext, parameters, null)
}
override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String],
schema: StructType): BaseRelation = {
val email = param(parameters, "email")
val password = param(parameters, "password")
val zoql = param(parameters, "zoql")
val instanceUrl = parameters.getOrElse("instanceURL", "https://rest.zuora.com")
val apiVersion = parameters.getOrElse("apiVersion", "38.0")
// TODO
val pageSizeParam = parameters.getOrElse("pageSize", "1000")
val pageSize = pageSizeParam.toInt
val zuoraInput = new ZuoraInput(email, password, zoql, instanceUrl, apiVersion, pageSize)
val records = new ZuoraReader(zuoraInput) read()
new DatasetRelation(records, sqlContext, schema)
}
override def createRelation(sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
logger.error("Save not supported by Zuora connector")
throw new UnsupportedOperationException
}
private def param(parameters: Map[String, String],
paramName: String) : String = {
val paramValue = parameters.getOrElse(paramName,
sys.error(s"""'$paramName' must be specified for Spark Zuora package"""))
if ("password".equals(paramName)) {
logger.debug("Param " + paramName + " value " + paramValue)
}
paramValue
}
}
开发者ID:springml,项目名称:spark-zuora,代码行数:60,代码来源:DefaultSource.scala
示例15: Document
//设置package包名称以及导入依赖的类
package sparknlp
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{MapType, StringType, StructField, StructType}
case class Document(
id: String,
text: String,
metadata: scala.collection.Map[String, String] = Map()
)
object Document {
def apply(row: Row): Document = {
Document(row.getString(0), row.getString(1), row.getMap[String, String](2))
}
val DocumentDataType: StructType = StructType(Array(
StructField("id",StringType,nullable = true),
StructField("text",StringType,nullable = true),
StructField("metadata",MapType(StringType,StringType,valueContainsNull = true),nullable = true)
))
}
开发者ID:alexander-n-thomas,项目名称:sparknlp,代码行数:23,代码来源:Document.scala
示例16:
//设置package包名称以及导入依赖的类
package test.yumi.pipeline
import com.typesafe.config.Config
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrameReader, SparkSession}
import org.apache.spark.sql.types.StructType
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import yumi.Job
import yumi.metastore.Metastore
import yumi.pipeline._
trait MockSessionSpec extends BaseSpec {
trait MockSessionScope extends BaseMockScope {
implicit val yumiContext = mock[YumiContext]
val pipelineFactory = mock[PipelineFactory]
val activityLoader = mock[ActivityLoader]
val contextFactory = mock[YumiContextFactory]
val sparkSession = mock[SparkSession]
val sparkContext = mock[SparkContext]
val dataFrameReader = mock[DataFrameReader]
val dataFrameWriter = mock[DataFrameWriter]
val emptyParameters = new Parameters
when(sparkSession.sparkContext).thenReturn(sparkContext)
when(yumiContext.sparkSession).thenReturn(sparkSession)
when(yumiContext.sparkContext).thenReturn(sparkContext)
when(yumiContext.dataFrameWriter).thenReturn(dataFrameWriter)
when(sparkSession.read).thenReturn(dataFrameReader)
when(dataFrameReader.format(any[String])).thenReturn(dataFrameReader)
when(dataFrameReader.schema(any[StructType])).thenReturn(dataFrameReader)
when(dataFrameReader.option(any[String], any[String])).thenReturn(dataFrameReader)
when(dataFrameReader.options(any[Map[String, String]])).thenReturn(dataFrameReader)
}
}
开发者ID:coderdiaries,项目名称:yumi,代码行数:41,代码来源:MockSessionSpec.scala
示例17:
//设置package包名称以及导入依赖的类
package org.sparksamples
import java.awt.Font
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.jfree.chart.axis.CategoryLabelPositions
import scalax.chart.module.ChartFactories
val customSchema = StructType(Array(
StructField("user_id", IntegerType, true),
StructField("movie_id", IntegerType, true),
StructField("rating", IntegerType, true),
StructField("timestamp", IntegerType, true)))
val spConfig = (new SparkConf).setMaster("local").setAppName("SparkApp")
val spark = SparkSession
.builder()
.appName("SparkRatingData").config(spConfig)
.getOrCreate()
val rating_df = spark.read.format("com.databricks.spark.csv")
.option("delimiter", "\t").schema(customSchema)
.load("../../data/ml-100k/u.data")
val rating_df_count = rating_df.groupBy("rating").count().sort("rating")
//val rating_df_count_sorted = rating_df_count.sort("count")
rating_df_count.show()
val rating_df_count_collection = rating_df_count.collect()
val ds = new org.jfree.data.category.DefaultCategoryDataset
val mx = scala.collection.immutable.ListMap()
for( x <- 0 until rating_df_count_collection.length) {
val occ = rating_df_count_collection(x)(0)
val count = Integer.parseInt(rating_df_count_collection(x)(1).toString)
ds.addValue(count,"UserAges", occ.toString)
}
//val sorted = ListMap(ratings_count.toSeq.sortBy(_._1):_*)
//val ds = new org.jfree.data.category.DefaultCategoryDataset
//sorted.foreach{ case (k,v) => ds.addValue(v,"Rating Values", k)}
val chart = ChartFactories.BarChart(ds)
val font = new Font("Dialog", Font.PLAIN,5);
chart.peer.getCategoryPlot.getDomainAxis().
setCategoryLabelPositions(CategoryLabelPositions.UP_90);
chart.peer.getCategoryPlot.getDomainAxis.setLabelFont(font)
chart.show()
Util.sc.stop()
}
}
开发者ID:PacktPublishing,项目名称:Machine-Learning-with-Spark-Second-Edition,代码行数:59,代码来源:CountByRatingChart.scala
示例18: UserRatingsChart
//设置package包名称以及导入依赖的类
package org.sparksamples
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import scalax.chart.module.ChartFactories
object UserRatingsChart {
def main(args: Array[String]) {
val customSchema = StructType(Array(
StructField("user_id", IntegerType, true),
StructField("movie_id", IntegerType, true),
StructField("rating", IntegerType, true),
StructField("timestamp", IntegerType, true)))
val spConfig = (new SparkConf).setMaster("local").setAppName("SparkApp")
val spark = SparkSession
.builder()
.appName("SparkRatingData").config(spConfig)
.getOrCreate()
val rating_df = spark.read.format("com.databricks.spark.csv")
.option("delimiter", "\t").schema(customSchema)
.load("../../data/ml-100k/u.data")
val rating_nos_by_user = rating_df.groupBy("user_id").count().sort("count")
val ds = new org.jfree.data.category.DefaultCategoryDataset
rating_nos_by_user.show(rating_nos_by_user.collect().length)
val rating_nos_by_user_collect =rating_nos_by_user.collect()
var mx = Map(0 -> 0)
val min = 1
val max = 1000
val bins = 100
val step = (max/bins).toInt
for (i <- step until (max + step) by step) {
mx += (i -> 0);
}
for( x <- 0 until rating_nos_by_user_collect.length) {
val user_id = Integer.parseInt(rating_nos_by_user_collect(x)(0).toString)
val count = Integer.parseInt(rating_nos_by_user_collect(x)(1).toString)
ds.addValue(count,"Ratings", user_id)
}
// ------------------------------------------------------------------
val chart = ChartFactories.BarChart(ds)
chart.peer.getCategoryPlot.getDomainAxis().setVisible(false)
chart.show()
Util.sc.stop()
}
}
开发者ID:PacktPublishing,项目名称:Machine-Learning-with-Spark-Second-Edition,代码行数:58,代码来源:UserRatingsChart.scala
示例19: Util
//设置package包名称以及导入依赖的类
package org.sparksamples
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
object Util {
val PATH = "/home/ubuntu/work/spark-2.0.0-bin-hadoop2.7/"
val DATA_PATH= "../../../data/ml-100k"
val PATH_MOVIES = DATA_PATH + "/u.item"
def reduceDimension2(x: Vector) : String= {
var i = 0
var l = x.toArray.size
var l_2 = l/2.toInt
var x_ = 0.0
var y_ = 0.0
for(i <- 0 until l_2) {
x_ += x(i).toDouble
}
for(i <- (l_2 + 1) until l) {
y_ += x(i).toDouble
}
var t = x_ + "," + y_
return t
}
def getMovieDataDF(spark : SparkSession) : DataFrame = {
//1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)
// |0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
val customSchema = StructType(Array(
StructField("id", StringType, true),
StructField("name", StringType, true),
StructField("date", StringType, true),
StructField("url", StringType, true)));
val movieDf = spark.read.format("com.databricks.spark.csv")
.option("delimiter", "|").schema(customSchema)
.load(PATH_MOVIES)
return movieDf
}
}
开发者ID:PacktPublishing,项目名称:Machine-Learning-with-Spark-Second-Edition,代码行数:46,代码来源:Util.scala
示例20: NTriplesRelation
//设置package包名称以及导入依赖的类
package net.sansa_stack.inference.spark.data.loader.sql
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import net.sansa_stack.inference.utils.NTriplesStringToRDFTriple
class NTriplesRelation(location: String, userSchema: StructType)
(@transient val sqlContext: SQLContext)
extends BaseRelation
with TableScan
with Serializable {
override def schema: StructType = {
if (this.userSchema != null) {
this.userSchema
}
else {
StructType(
Seq(
StructField("s", StringType, true),
StructField("p", StringType, true),
StructField("o", StringType, true)
))
}
}
override def buildScan(): RDD[Row] = {
val rdd = sqlContext
.sparkContext
.textFile(location)
val converter = new NTriplesStringToRDFTriple()
val rows = rdd.flatMap(x => converter.apply(x)).map(t => Row.fromSeq(Seq(t.s, t.p, t.o)))
rows
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-Inference,代码行数:40,代码来源:NTriplesRelation.scala
注:本文中的org.apache.spark.sql.types.StructType类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论