本文整理汇总了Scala中org.apache.spark.SparkContext类的典型用法代码示例。如果您正苦于以下问题:Scala SparkContext类的具体用法?Scala SparkContext怎么用?Scala SparkContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SparkContext类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: Histogram
//设置package包名称以及导入依赖的类
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
object Histogram{
def main(args:Array[String]){
val conf:SparkConf = new SparkConf().setAppName("Histogram").setMaster("local")
val sc:SparkContext = new SparkContext(conf)
val dataset1:RDD[String] = sc.textFile("/home/hadoop/spark/scala/mllib/core/data1")
val dataset2:RDD[String] = sc.textFile("/home/hadoop/spark/scala/mllib/core/data2");
val subRDD:RDD[String] = dataset1.subtract(dataset2)
val keyValueRDD:RDD[(String, String)] = subRDD.map(line => (line.split(",")(1), line.split(",")(0)))
val hist = keyValueRDD.countByKey
for((k,v) <- hist){
println(k + "===>" + v)
}
}
}
开发者ID:malli3131,项目名称:SparkApps,代码行数:19,代码来源:Histogram.scala
示例2: Person
//设置package包名称以及导入依赖的类
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
case class Person(name: String, age: Int)
object SimpleApp {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val info = List(("mike", 24), ("joe", 34), ("jack", 55))
val infoRDD = sc.parallelize(info)
val people = infoRDD.map(r => Person(r._1, r._2)).toDF()
people.registerTempTable("people")
val subDF = sqlContext.sql("select * from people where age > 30")
subDF.show()
}
}
开发者ID:mykumar,项目名称:SparkScalaInternalExperiements,代码行数:25,代码来源:SimpleApp.scala
示例3: LRCV
//设置package包名称以及导入依赖的类
package com.ferhtaydn.rater
import org.apache.spark.SparkContext
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{ StringIndexerModel, VectorAssembler }
import org.apache.spark.ml.tuning.{ CrossValidator, CrossValidatorModel, ParamGridBuilder }
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.sql.{ DataFrame, Row, SQLContext }
class LRCV(sc: SparkContext) {
implicit val sqlContext = new SQLContext(sc)
val lr = new LogisticRegression().setMaxIter(10).setFeaturesCol("scaledFeatures")
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.build()
val assembler = new VectorAssembler()
.setInputCols(Array("gender", "age", "weight", "height", "indexedJob"))
.setOutputCol("features")
val pipeline = new Pipeline()
.setStages(Array(assembler, standardScaler("features"), lr))
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new BinaryClassificationEvaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(10)
def train(df: DataFrame): (StringIndexerModel, CrossValidatorModel, Matrix) = {
// need to index strings on all data to not missing the job fields.
// other alternative can be manually assign values for each job like gender.
val indexerModel = stringIndexer("job").fit(df)
val indexed = indexerModel.transform(df)
val splits = indexed.randomSplit(Array(0.8, 0.2))
val training = splits(0).cache()
val test = splits(1)
val cvModel = cv.fit(training)
val predictionAndLabels = cvModel
.transform(test)
.select("label", "prediction").map {
case Row(label: Double, prediction: Double) ?
(prediction, label)
}
printBinaryMetrics(predictionAndLabels)
(indexerModel, cvModel, confusionMatrix(predictionAndLabels))
}
}
开发者ID:ferhtaydn,项目名称:canceRater,代码行数:62,代码来源:LRCV.scala
示例4: TikaLanguageAggregationExample
//设置package包名称以及导入依赖的类
package com.jasonfeist.spark.tika.example
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object TikaLanguageAggregationExample {
def main(args: Array[String]) {
if (args.length == 0 || args(0) == null) {
return
}
val conf = new SparkConf().setAppName("Tika Language Aggregation Example")
val sc: SparkContext = new SparkContext(conf)
val sqlContext: SQLContext = new SQLContext(sc)
val df: DataFrame = sqlContext.read
.format("com.jasonfeist.spark.tika")
.load(args(0))
.groupBy("Language")
.count()
df.show
}
}
开发者ID:jasonfeist,项目名称:tika-spark-datasource,代码行数:24,代码来源:TikaLanguageAggregationExample.scala
示例5: RealEstateData
//设置package包名称以及导入依赖的类
package fr.grislain
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.{ SQLContext, DataFrame, Row }
import org.apache.spark.sql.types._
object RealEstateData {
println("Starting real_estate_price")
val conf = new SparkConf().setAppName("real_estate_price").setMaster("local")
val context = new SparkContext(conf)
val sqlContext = new SQLContext(context)
def dataFrame: DataFrame = {
val input = context.textFile("../data/insee_notaires.csv")
sqlContext.createDataFrame(input mapPartitions { _.drop(1) } map {
line =>
Row.fromSeq(line.split(",").view.zipWithIndex filter { e => e._2 > 0 } flatMap {
e =>
e match {
case (t, 1) => Seq(t.take(4).toInt, t.drop(5).toInt)
case (p, _) => Seq(p.toDouble)
}
})
},
StructType(StructField("year", IntegerType) ::
StructField("quarter", IntegerType) ::
StructField("75001", DoubleType) ::
StructField("75002", DoubleType) ::
StructField("75003", DoubleType) ::
StructField("75004", DoubleType) ::
StructField("75005", DoubleType) ::
StructField("75006", DoubleType) ::
StructField("75007", DoubleType) ::
StructField("75008", DoubleType) ::
StructField("75009", DoubleType) ::
StructField("75010", DoubleType) ::
StructField("75011", DoubleType) ::
StructField("75012", DoubleType) ::
StructField("75013", DoubleType) ::
StructField("75014", DoubleType) ::
StructField("75015", DoubleType) ::
StructField("75016", DoubleType) ::
StructField("75017", DoubleType) ::
StructField("75018", DoubleType) ::
StructField("75019", DoubleType) ::
StructField("75020", DoubleType) :: Nil))
}
}
开发者ID:ngrislain,项目名称:french_real_estate,代码行数:50,代码来源:RealEstateData.scala
示例6: MedianOfMediansCalculator
//设置package包名称以及导入依赖的类
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
class MedianOfMediansCalculator {
def calculateMedianOfMediansForFile(hdfsFilePath: String, sc: SparkContext): Double =
calculateMedianOfMedians(sortAndNumberMedians(calculateMediansPerLine(readFileOfIntegers(hdfsFilePath, sc))))
def readFileOfIntegers(hdfsFilePath: String, sc: SparkContext): RDD[Array[Int]] = {
sc.textFile(hdfsFilePath)
.map(line => line.split("\\D+"))
.map(lineParts => lineParts.map(number => number.toInt)
.sorted)
}
def calculateMediansPerLine(integerArrayRdd: RDD[Array[Int]]): RDD[Double] = {
integerArrayRdd.map { lineInts =>
if (lineInts.length % 2 == 0)
(lineInts(lineInts.length / 2) + lineInts((lineInts.length / 2) + 1)) / 2.0
else
lineInts((lineInts.length / 2) + 1)
}
}
def sortAndNumberMedians(lineMedians: RDD[Double]): RDD[(Long, Double)] = {
lineMedians
.sortBy(identity)
.zipWithIndex
.keyBy { case (_, index) => index }
.mapValues { case (value, _) => value }
}
def calculateMedianOfMedians(sortedAndNumberedMedians: RDD[(Long, Double)]): Double = {
if (sortedAndNumberedMedians.count() % 2 == 0)
sortedAndNumberedMedians.lookup((sortedAndNumberedMedians.count / 2) + 1).head + sortedAndNumberedMedians.lookup(sortedAndNumberedMedians.count / 2).head / 2.0
else
sortedAndNumberedMedians.lookup((sortedAndNumberedMedians.count / 2) + 1).head
}
}
开发者ID:qayshp,项目名称:medianOfMedians,代码行数:40,代码来源:MedianOfMediansCalculator.scala
示例7: SimpleApp
//设置package包名称以及导入依赖的类
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
object SimpleApp {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("jdbc").
option("url", "jdbc:mysql://statsdb02p-am-tor02:3306/aminno").
option("driver", "com.mysql.jdbc.Driver").
option("dbtable", "member").
option("user", System.getenv("MYSQL_USERNAME")).
option("password", System.getenv("MYSQL_PASSWORD")).
option("partitionColumn", "hp").
option("lowerBound", "0").
option("upperBound", "44000000").
option("numPartitions", "5").
load()
df.registerTempTable("achat")
val someRows = sqlContext.sql("select hp, count(distinct up) as cnt from achat group by hp order by cnt desc").head()
println("--------see here!------->" + someRows.mkString(" "))
}
}
开发者ID:mykumar,项目名称:SparkScalaInternalExperiements,代码行数:27,代码来源:SimpleApp.scala
示例8: ParquetS3Backup
//设置package包名称以及导入依赖的类
package com.unity.analytics.spark.utils.parquet
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{Logging, SparkConf, SparkContext}
object ParquetS3Backup extends Logging{
implicit val formats = org.json4s.DefaultFormats
def main(args: Array[String]): Unit = {
val config = new ParquetS3BackupConfiguration(args)
val sparkConf = new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sqlContext = new SQLContext(new SparkContext(sparkConf))
config.merge() match {
case true => merge(sqlContext, config.srcDir(), config.destDir(), config.numFiles())
case false => split(sqlContext, config.srcDir(), config.destDir(), config.numFiles())
}
}
// Reads, then merges Parquet files and writes to destDir
def merge(sqlContext: SQLContext, srcDir: String, destDir: String, destNumFiles: Int): Unit = {
logInfo(s"ParquetS3Backup merge - srcDir: $srcDir, destDir: $destDir, destNumFiles: $destNumFiles")
sqlContext.read.parquet(srcDir)
.coalesce(destNumFiles)
.write.mode(SaveMode.Overwrite).parquet(destDir)
}
// Reads, then splits Parquet files and writes to destDir
def split(sqlContext: SQLContext, srcDir: String, destDir: String, destNumFiles: Int): Unit = {
logInfo(s"ParquetS3Backup split - srcDir: $srcDir, destDir: $destDir, destNumFiles: $destNumFiles")
sqlContext.read.parquet(srcDir)
.repartition(destNumFiles)
.write.mode(SaveMode.Overwrite).parquet(destDir)
}
// Reads backupMetadata and does a Backup on each srcDir to destDir, to the set number of files
def backup(sqlContext: SQLContext, backupMetadata: BackupMetadata): Unit = {
backupMetadata.backupEntries.foreach(backupEntry => {
if (backupEntry.destNumFiles <= backupEntry.srcNumFiles) {
merge(sqlContext, backupEntry.srcDir, backupEntry.destDir, backupEntry.destNumFiles)
} else {
split(sqlContext, backupEntry.srcDir, backupEntry.destDir, backupEntry.destNumFiles)
}
})
}
// Reads backupMetadata and restores from destDir to the srcDir, bringing back the original number of files
def restore(sqlContext: SQLContext, backupMetadata: BackupMetadata): Unit = {
backupMetadata.backupEntries.foreach(backupEntry => {
if (backupEntry.srcNumFiles <= backupEntry.destNumFiles) {
merge(sqlContext, backupEntry.destDir, backupEntry.srcDir, backupEntry.srcNumFiles)
} else {
split(sqlContext, backupEntry.destDir, backupEntry.srcDir, backupEntry.srcNumFiles)
}
})
}
}
开发者ID:UnityTech,项目名称:parquet-s3-backup,代码行数:59,代码来源:ParquetS3Backup.scala
示例9: HappinessMeterDataManning
//设置package包名称以及导入依赖的类
package happiness.meter
import com.datastax.spark.connector._
import org.apache.spark.{SparkConf, SparkContext}
object HappinessMeterDataManning {
val Scala = "scala"
val Java = "java"
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("Happiness Meter Spark App Analyzer")
.set("spark.logConf", "true")
.set("spark.driver.port", "7778")
.set("spark.driver.host", "localhost")
.set("spark.akka.logLifecycleEvents", "true")
.set("spark.cassandra.connection.host", "127.0.0.1")
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "c:\\winutil\\")
val sparkContext = new SparkContext(sparkConf)
val tweets = sparkContext
.cassandraTable("ks", "twitter")
.map(row => (row.getString("tweet_class"), row.getString("tweet_text")))
.filter {
case (clazz, text) => !text.contains("tea") && !text.contains("hiring") && !text.contains("offer")
!text.toLowerCase.contains("jobsearch".toLowerCase)
}.cache()
println(s"total count: ${tweets.count()}")
tweets foreach println
sparkContext.stop()
}
def roundBy2(x: Double) = Math.round(x * 100) / 100.0
}
开发者ID:mronethere,项目名称:playground,代码行数:39,代码来源:HappinessMeterDataManning.scala
示例10: SimpleApp
//设置package包名称以及导入依赖的类
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SQLContext}
object SimpleApp {
val url = "jdbc:mysql://bigdata-master:3306/nlp"
val driver = "com.mysql.jdbc.Driver"
val user = System.getenv("MYSQL_USERNAME")
val pwd = System.getenv("MYSQL_PASSWORD")
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("jdbc").
option("url", url).
option("driver", driver).
option("dbtable", "msg").
option("user", user).
option("password", pwd).
load()
df.registerTempTable("t_msg")
val msgDF = sqlContext.sql("select message from t_msg")
msgDF.printSchema()
val cleaner = (msg: String) => {
msg.toLowerCase.split(" ").map((w: String) => w.replaceAll("[^a-zA-Z0-9]", "")).distinct
}
val wordDF = msgDF.explode("message", "word")((r: String) => cleaner(r))
wordDF.registerTempTable("words")
val wordCount = sqlContext.sql("select word, count(1) as cnt from words group by word order by cnt desc")
println(wordCount.count())
save(wordCount, "msg_word_count")
}
def save(dataFrame: DataFrame, table: String): Unit = {
val props = new java.util.Properties()
props.setProperty("user", user)
props.setProperty("password", pwd)
props.setProperty("driver", driver)
// create and save in table
dataFrame.write.jdbc(url, table, props)
}
}
开发者ID:mykumar,项目名称:SparkScalaInternalExperiements,代码行数:50,代码来源:SimpleApp.scala
示例11: Checkpoint
//设置package包名称以及导入依赖的类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf,SparkContext}
object Checkpoint{
def main(args:Array[String]){
val conf = new SparkConf().setAppName("RDD Aggregate").setMaster("local")
val sc = new SparkContext(conf)
sc.setCheckpointDir("./projdata")
val stocks = sc.textFile("./stocks")
val projdata = stocks.map(record => (record.split("\t")(1), record.split("\t")(7).toInt))
projdata.checkpoint()
println(projdata.count())
}
}
开发者ID:malli3131,项目名称:SparkApps,代码行数:15,代码来源:CheckPoint.scala
示例12: GroupWith
//设置package包名称以及导入依赖的类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf,SparkContext}
object GroupWith{
def main(args:Array[String]){
val conf = new SparkConf().setAppName("RDD Aggregate").setMaster("local")
val sc = new SparkContext(conf)
val citi = sc.textFile("./citi")
val hdfc = sc.textFile("./hdfc")
val sbi = sc.textFile("./sbi")
val citiPairRDD = citi.map(row => (row.split("\t")(0), row.split("\t")(1).toInt))
val hdfcPairRDD = hdfc.map(row => (row.split("\t")(0), row.split("\t")(1).toInt))
val sbiPairRDD = sbi.map(row => (row.split("\t")(0), row.split("\t")(1).toInt))
val groupRDD = citiPairRDD.groupWith(hdfcPairRDD, sbiPairRDD)
groupRDD.collect.foreach{println}
}
}
开发者ID:malli3131,项目名称:SparkApps,代码行数:18,代码来源:GroupWith.scala
示例13: Collector
//设置package包名称以及导入依赖的类
package com.databricks.apps.twitterClassifier
import com.google.gson.Gson
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.twitter.TwitterUtils
object Collector {
def doIt(options: CollectOptions, sc: SparkContext, ssc: StreamingContext) {
val tweetStream: DStream[String] = TwitterUtils.createStream(ssc, maybeTwitterAuth)
.map(new Gson().toJson(_))
var numTweetsCollected = 0L
tweetStream.foreachRDD { (rdd, time) =>
val count = rdd.count
if (count > 0) {
rdd.saveAsTextFile(options.tweetDirectory.getAbsolutePath)
numTweetsCollected += count
if (numTweetsCollected > options.numTweetsToCollect) System.exit(0)
}
}
ssc.start()
ssc.awaitTermination()
}
}
开发者ID:krish121,项目名称:Spark-reference-applications,代码行数:29,代码来源:Collect.scala
示例14: Classifier
//设置package包名称以及导入依赖的类
package edu.neu.coe.scala.spark.spam
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object Classifier extends App {
val conf = new SparkConf().setAppName("spam")
val sc = new SparkContext(conf)
val spam = sc.textFile("spam.txt")
val norm = sc.textFile("normal.txt")
val tf = new HashingTF(10000)
val spamFeatures = spam.map(email => tf.transform(email.split(" ")))
val normFeatures = norm.map(email => tf.transform(email.split(" ")))
val posExamples = spamFeatures.map(f => LabeledPoint(1, f))
val negExamples = normFeatures.map(f => LabeledPoint(0, f))
val trainingData = posExamples.union(negExamples)
trainingData.cache()
val model = new LogisticRegressionWithSGD().run(trainingData)
val posTest = tf.transform("Subject: Cheap Stuff From: <omg.fu> O M G GET cheap stuff by sending money to Robin Hillyard".split(" "))
val negTest = tf.transform("Subject: Spark From: Robin Hillyard<[email protected]> Hi Adam, I started studying Spark the other day".split(" "))
println(s"Prediction for positive test example: ${model.predict(posTest)}")
println(s"Prediction for negative test example: ${model.predict(negTest)}")
}
开发者ID:menezesl,项目名称:Scala-repo,代码行数:34,代码来源:Classifier.scala
示例15: driver
//设置package包名称以及导入依赖的类
import java.io._
import utils._
import SMOTE._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import breeze.linalg._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.{ArrayBuffer,Map}
object driver {
def main(args: Array[String]) {
val conf = new SparkConf()
val options = args.map { arg =>
arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v)
case Array(opt) => (opt -> "")
case _ => throw new IllegalArgumentException("Invalid argument: "+arg)
}
}.toMap
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)
val sc = new SparkContext(conf)
// read in general inputs
val inputDirectory = options.getOrElse("inputDirectory","")
val outputDirectory = options.getOrElse("outputDirectory","")
val numFeatures = options.getOrElse("numFeatures","0").toInt
val oversamplingPctg = options.getOrElse("oversamplingPctg","1.0").toDouble
val kNN = options.getOrElse("K","5").toInt
val delimiter = options.getOrElse("delimiter",",")
val numPartitions = options.getOrElse("numPartitions","20").toInt
SMOTE.runSMOTE(sc, inputDirectory, outputDirectory, numFeatures, oversamplingPctg, kNN, delimiter, numPartitions)
println("The algorithm has finished running")
sc.stop()
}
}
开发者ID:anathan90,项目名称:SparkSMOTE,代码行数:46,代码来源:driver.scala
示例16: LogUtils
//设置package包名称以及导入依赖的类
package org.iamShantanu101.spark.SentimentAnalyzer.utils
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SparkContext}
object LogUtils extends Logging {
def setLogLevels(sparkContext: SparkContext) {
sparkContext.setLogLevel(Level.WARN.toString)
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
logInfo(
"""Setting log level to [WARN] for streaming executions.
|To override add a custom log4j.properties to the classpath.""".stripMargin)
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}
开发者ID:iamShantanu101,项目名称:Twitter-Sentiment-Analyzer-Apache-Spark-Mllib,代码行数:21,代码来源:LogUtils.scala
示例17: SQLContextSingleton
//设置package包名称以及导入依赖的类
package org.iamShantanu101.spark.SentimentAnalyzer.utils
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
object SQLContextSingleton {
@transient
@volatile private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = SQLContext.getOrCreate(sparkContext)
}
}
}
instance
}
}
开发者ID:iamShantanu101,项目名称:Twitter-Sentiment-Analyzer-Apache-Spark-Mllib,代码行数:23,代码来源:SQLContextSingleton.scala
示例18: HappinessMeterAnalyzer
//设置package包名称以及导入依赖的类
package happiness.meter
import org.apache.spark.{SparkContext, SparkConf}
import com.datastax.spark.connector._
object HappinessMeterAnalyzer {
val Scala = "scala"
val Java = "java"
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("Happiness Meter Spark App Analyzer")
.set("spark.logConf", "true")
.set("spark.driver.port", "7778")
.set("spark.driver.host", "localhost")
.set("spark.akka.logLifecycleEvents", "true")
.set("spark.cassandra.connection.host", "127.0.0.1")
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "c:\\winutil\\")
val sparkContext = new SparkContext(sparkConf)
val tweets = sparkContext
.cassandraTable("ks", "twitter")
.map(row => (row.getString("tweet_class"), row.getBoolean("tweet_happy")))
.countByValue()
val scalaHappyTweets = tweets((Scala, true))
val scalaUnhappyTweets = tweets((Scala, false))
val javaHappyTweets = tweets((Java, true))
val javaUnhappyTweets = tweets((Java, false))
val scalaSatisfactionPercentage = roundBy2((scalaHappyTweets * 100.0) / (scalaHappyTweets + scalaUnhappyTweets))
val javaSatisfactionPercentage = roundBy2((javaHappyTweets * 100.0) / (javaHappyTweets + javaUnhappyTweets))
println(
s"""
|Java vs Scala satisfaction twitter model (~ 2 hours of manning)
|Total tweets = ${scalaHappyTweets + scalaUnhappyTweets + javaHappyTweets + javaUnhappyTweets}
|Scala satisfaction = $scalaSatisfactionPercentage%
|Java satisfaction = $javaSatisfactionPercentage%
""".stripMargin)
sparkContext.stop()
}
def roundBy2(x: Double) = Math.round(x * 100) / 100.0
}
开发者ID:mronethere,项目名称:playground,代码行数:48,代码来源:HappinessMeterAnalyzer.scala
示例19: Context
//设置package包名称以及导入依赖的类
package org.hammerlab.spark
import org.apache.spark.{ SparkConf, SparkContext }
import org.hammerlab.hadoop.Configuration
case class Context(@transient sc: SparkContext)
extends Configuration(sc.hadoopConfiguration)
object Context {
implicit def makeContext(sc: SparkContext): Context = Context(sc)
implicit def umakeContext(context: Context): SparkContext = context.sc
def apply()(implicit conf: SparkConf): Context =
Context(
new SparkContext(
conf
)
)
}
开发者ID:hammerlab,项目名称:spark-util,代码行数:20,代码来源:Context.scala
示例20: Job
//设置package包名称以及导入依赖的类
package com.frontier45.sparktemplate
object Job {
private def mainTask(args: RunArgs)(implicit sc: SparkContext): Unit = {
val src = sc.textFile(args.fname, minPartitions = args.n_partitions)
println(src.count())
}
def start(args: RunArgs): Unit = {
val conf = new SparkConf()
conf.getOption("spark.app.name").getOrElse(conf.setAppName(appName))
conf.getOption("spark.master").getOrElse(conf.setMaster("local[*]"))
implicit val sc = new SparkContext(conf)
mainTask(args)
sc.stop()
}
}
开发者ID:lucidfrontier45,项目名称:SparkTemplate,代码行数:21,代码来源:Job.scala
注:本文中的org.apache.spark.SparkContext类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论