本文整理汇总了Scala中org.apache.spark.SparkConf类的典型用法代码示例。如果您正苦于以下问题:Scala SparkConf类的具体用法?Scala SparkConf怎么用?Scala SparkConf使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SparkConf类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: Histogram
//设置package包名称以及导入依赖的类
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
object Histogram{
def main(args:Array[String]){
val conf:SparkConf = new SparkConf().setAppName("Histogram").setMaster("local")
val sc:SparkContext = new SparkContext(conf)
val dataset1:RDD[String] = sc.textFile("/home/hadoop/spark/scala/mllib/core/data1")
val dataset2:RDD[String] = sc.textFile("/home/hadoop/spark/scala/mllib/core/data2");
val subRDD:RDD[String] = dataset1.subtract(dataset2)
val keyValueRDD:RDD[(String, String)] = subRDD.map(line => (line.split(",")(1), line.split(",")(0)))
val hist = keyValueRDD.countByKey
for((k,v) <- hist){
println(k + "===>" + v)
}
}
}
开发者ID:malli3131,项目名称:SparkApps,代码行数:19,代码来源:Histogram.scala
示例2: Person
//设置package包名称以及导入依赖的类
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
case class Person(name: String, age: Int)
object SimpleApp {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val info = List(("mike", 24), ("joe", 34), ("jack", 55))
val infoRDD = sc.parallelize(info)
val people = infoRDD.map(r => Person(r._1, r._2)).toDF()
people.registerTempTable("people")
val subDF = sqlContext.sql("select * from people where age > 30")
subDF.show()
}
}
开发者ID:mykumar,项目名称:SparkScalaInternalExperiements,代码行数:25,代码来源:SimpleApp.scala
示例3: Conf
//设置package包名称以及导入依赖的类
package org.hammerlab.spark
import org.apache.spark.SparkConf
import org.hammerlab.paths.Path
object Conf {
val propsLineRegex = """(\S+)\s+(.*)""".r
def apply(loadDefaults: Boolean = true): SparkConf = {
val envSparkPropertiesFiles =
Option(System.getenv("SPARK_PROPERTIES_FILES"))
.toList
.flatMap(_.split(","))
.filterNot(_.isEmpty)
val sparkProperties =
envSparkPropertiesFiles
.flatMap {
path ?
Path(path)
.lines
.filter(_.trim.nonEmpty)
.map {
case propsLineRegex(key, value) ?
key ? value
case line ?
throw new IllegalArgumentException(
s"Invalid property line in $path: '$line'"
)
}
}
val sparkConf = new SparkConf()
for {
(k, v) ? sparkProperties
} {
sparkConf.set(k, v)
}
sparkConf
}
}
开发者ID:hammerlab,项目名称:spark-util,代码行数:45,代码来源:Conf.scala
示例4: RealEstateData
//设置package包名称以及导入依赖的类
package fr.grislain
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.{ SQLContext, DataFrame, Row }
import org.apache.spark.sql.types._
object RealEstateData {
println("Starting real_estate_price")
val conf = new SparkConf().setAppName("real_estate_price").setMaster("local")
val context = new SparkContext(conf)
val sqlContext = new SQLContext(context)
def dataFrame: DataFrame = {
val input = context.textFile("../data/insee_notaires.csv")
sqlContext.createDataFrame(input mapPartitions { _.drop(1) } map {
line =>
Row.fromSeq(line.split(",").view.zipWithIndex filter { e => e._2 > 0 } flatMap {
e =>
e match {
case (t, 1) => Seq(t.take(4).toInt, t.drop(5).toInt)
case (p, _) => Seq(p.toDouble)
}
})
},
StructType(StructField("year", IntegerType) ::
StructField("quarter", IntegerType) ::
StructField("75001", DoubleType) ::
StructField("75002", DoubleType) ::
StructField("75003", DoubleType) ::
StructField("75004", DoubleType) ::
StructField("75005", DoubleType) ::
StructField("75006", DoubleType) ::
StructField("75007", DoubleType) ::
StructField("75008", DoubleType) ::
StructField("75009", DoubleType) ::
StructField("75010", DoubleType) ::
StructField("75011", DoubleType) ::
StructField("75012", DoubleType) ::
StructField("75013", DoubleType) ::
StructField("75014", DoubleType) ::
StructField("75015", DoubleType) ::
StructField("75016", DoubleType) ::
StructField("75017", DoubleType) ::
StructField("75018", DoubleType) ::
StructField("75019", DoubleType) ::
StructField("75020", DoubleType) :: Nil))
}
}
开发者ID:ngrislain,项目名称:french_real_estate,代码行数:50,代码来源:RealEstateData.scala
示例5: RddAggregateByKey
//设置package包名称以及导入依赖的类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf,SparkContext}
object RddAggregateByKey{
def main(args:Array[String]){
val conf = new SparkConf().setAppName("RDD Aggregate").setMaster("local")
val sc = new SparkContext(conf)
val stocks = sc.textFile("./stocks")
val projdata = stocks.map(line => (line.split("\t")(1), line.split("\t")(7).toInt))
val volMax = projdata.aggregateByKey(0)(math.max(_,_), math.max(_,_))
val volMin = projdata.aggregateByKey(100000000)(math.min(_,_), math.min(_,_))
val aggRdd = volMax ++ volMin
aggRdd.saveAsTextFile("./voulme")
}
}
开发者ID:malli3131,项目名称:SparkApps,代码行数:16,代码来源:RddAggregateByKey.scala
示例6: SimpleApp
//设置package包名称以及导入依赖的类
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
object SimpleApp {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("jdbc").
option("url", "jdbc:mysql://statsdb02p-am-tor02:3306/aminno").
option("driver", "com.mysql.jdbc.Driver").
option("dbtable", "member").
option("user", System.getenv("MYSQL_USERNAME")).
option("password", System.getenv("MYSQL_PASSWORD")).
option("partitionColumn", "hp").
option("lowerBound", "0").
option("upperBound", "44000000").
option("numPartitions", "5").
load()
df.registerTempTable("achat")
val someRows = sqlContext.sql("select hp, count(distinct up) as cnt from achat group by hp order by cnt desc").head()
println("--------see here!------->" + someRows.mkString(" "))
}
}
开发者ID:mykumar,项目名称:SparkScalaInternalExperiements,代码行数:27,代码来源:SimpleApp.scala
示例7: ParquetS3Backup
//设置package包名称以及导入依赖的类
package com.unity.analytics.spark.utils.parquet
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{Logging, SparkConf, SparkContext}
object ParquetS3Backup extends Logging{
implicit val formats = org.json4s.DefaultFormats
def main(args: Array[String]): Unit = {
val config = new ParquetS3BackupConfiguration(args)
val sparkConf = new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sqlContext = new SQLContext(new SparkContext(sparkConf))
config.merge() match {
case true => merge(sqlContext, config.srcDir(), config.destDir(), config.numFiles())
case false => split(sqlContext, config.srcDir(), config.destDir(), config.numFiles())
}
}
// Reads, then merges Parquet files and writes to destDir
def merge(sqlContext: SQLContext, srcDir: String, destDir: String, destNumFiles: Int): Unit = {
logInfo(s"ParquetS3Backup merge - srcDir: $srcDir, destDir: $destDir, destNumFiles: $destNumFiles")
sqlContext.read.parquet(srcDir)
.coalesce(destNumFiles)
.write.mode(SaveMode.Overwrite).parquet(destDir)
}
// Reads, then splits Parquet files and writes to destDir
def split(sqlContext: SQLContext, srcDir: String, destDir: String, destNumFiles: Int): Unit = {
logInfo(s"ParquetS3Backup split - srcDir: $srcDir, destDir: $destDir, destNumFiles: $destNumFiles")
sqlContext.read.parquet(srcDir)
.repartition(destNumFiles)
.write.mode(SaveMode.Overwrite).parquet(destDir)
}
// Reads backupMetadata and does a Backup on each srcDir to destDir, to the set number of files
def backup(sqlContext: SQLContext, backupMetadata: BackupMetadata): Unit = {
backupMetadata.backupEntries.foreach(backupEntry => {
if (backupEntry.destNumFiles <= backupEntry.srcNumFiles) {
merge(sqlContext, backupEntry.srcDir, backupEntry.destDir, backupEntry.destNumFiles)
} else {
split(sqlContext, backupEntry.srcDir, backupEntry.destDir, backupEntry.destNumFiles)
}
})
}
// Reads backupMetadata and restores from destDir to the srcDir, bringing back the original number of files
def restore(sqlContext: SQLContext, backupMetadata: BackupMetadata): Unit = {
backupMetadata.backupEntries.foreach(backupEntry => {
if (backupEntry.srcNumFiles <= backupEntry.destNumFiles) {
merge(sqlContext, backupEntry.destDir, backupEntry.srcDir, backupEntry.srcNumFiles)
} else {
split(sqlContext, backupEntry.destDir, backupEntry.srcDir, backupEntry.srcNumFiles)
}
})
}
}
开发者ID:UnityTech,项目名称:parquet-s3-backup,代码行数:59,代码来源:ParquetS3Backup.scala
示例8: MyApp
//设置package包名称以及导入依赖的类
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.log4j.{Level, Logger}
object MyApp {
def main(args: Array[String]) {
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)
val file = args(0)
val conf = new SparkConf(true).setAppName("My Application")
val sc = new SparkContext(conf)
val tf = sc.textFile(file,2)
val splits = tf.flatMap(line => line.split(" ")).map(word =>(word,1))
val counts = splits.reduceByKey((x,y)=>x+y)
System.out.println(counts.collect().mkString(", "))
sc.stop()
}
}
开发者ID:gibrano,项目名称:docker-spark,代码行数:20,代码来源:myapp.scala
示例9: KuduAccountMartSimpleSums
//设置package包名称以及导入依赖的类
package com.hadooparchitecturebook.taxi360.sql.kudu
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
object KuduAccountMartSimpleSums {
def main(args: Array[String]): Unit = {
if (args.length == 0) {
println("Args: <runLocal> <kuduMaster> " +
"<kuduAccountMartTableName> ")
return
}
val runLocal = args(0).equalsIgnoreCase("l")
val kuduMaster = args(1)
val kuduAccountMartTableName = args(2)
val sc: SparkContext = if (runLocal) {
val sparkConfig = new SparkConf()
sparkConfig.set("spark.broadcast.compress", "false")
sparkConfig.set("spark.shuffle.compress", "false")
sparkConfig.set("spark.shuffle.spill.compress", "false")
new SparkContext("local", "TableStatsSinglePathMain", sparkConfig)
} else {
val sparkConfig = new SparkConf().setAppName("TableStatsSinglePathMain")
new SparkContext(sparkConfig)
}
val hiveContext = new HiveContext(sc)
val kuduOptions = Map(
"kudu.table" -> kuduAccountMartTableName,
"kudu.master" -> kuduMaster)
hiveContext.read.options(kuduOptions).format("org.kududb.spark.kudu").load.
registerTempTable("account_mart_tmp")
println("------------")
val values = hiveContext.sql("select account_id, sum(win_count) from account_mart_tmp group by account_id").
take(100)
println("------------")
values.foreach(println)
println("------------")
sc.stop()
}
}
开发者ID:hadooparchitecturebook,项目名称:Taxi360,代码行数:51,代码来源:KuduAccountMartSimpleSums.scala
示例10: PrintMetrics
//设置package包名称以及导入依赖的类
package com.github.jongwook
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object PrintMetrics extends App {
val (prediction, labels) = RankingDataProvider(MovieLensLoader.load())
val spark = SparkSession.builder().master(new SparkConf().get("spark.master", "local[8]")).getOrCreate()
val metrics = new SparkRankingMetrics(spark.createDataFrame(prediction), spark.createDataFrame(labels), itemCol = "product", predictionCol = "rating")
val ats = Seq(5, 10, 20, 100, Integer.MAX_VALUE)
val toPrint = Map[String, SparkRankingMetrics => Seq[Int] => Seq[Double]](
"Precision" -> { m => k => m.precisionAt(k) },
"Recall" -> { m => k => m.recallAt(k) },
"F1" -> { m => k => m.f1At(k) },
"NDCG" -> { m => k => m.ndcgAt(k) },
"MAP" -> { m => k => m.mapAt(k) },
"MRR" -> { m => k => m.mrrAt(k) }
)
for ((metric, calculator) <- toPrint) {
printf("%12s", metric)
val f = calculator(metrics)
for (x <- f(ats)) {
printf("%12.8f", x)
}
println()
}
}
开发者ID:jongwook,项目名称:spark-ranking-metrics,代码行数:33,代码来源:PrintMetrics.scala
示例11: SparkSQL
//设置package包名称以及导入依赖的类
package utils
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
object SparkSQL {
def simpleSparkSQLApp {
val driverHost = "localhost"
val conf = new SparkConf(false) // skip loading external settings
.setMaster("local[4]") // run locally with enough threads
.setAppName("firstSparkApp")
.set("spark.logConf", "true")
.set("spark.driver.host", s"$driverHost")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext._
val path = "ecommerce-event-data.json"
val returnType = sqlContext.read.json(path)
returnType.select("user_name").collect().foreach { x => println(x.getString(0)) }
returnType.printSchema();
}
}
开发者ID:listaction,项目名称:spark-test-1,代码行数:32,代码来源:SparkSQL.scala
示例12: T01
//设置package包名称以及导入依赖的类
package streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object T01 {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
// not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
// nc -lk 9999
// ./bin/run-example streaming.T01 localhost 9999
}
}
开发者ID:IMJIU,项目名称:Spark1.6,代码行数:27,代码来源:T01.scala
示例13: Classifier
//设置package包名称以及导入依赖的类
package edu.neu.coe.scala.spark.spam
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object Classifier extends App {
val conf = new SparkConf().setAppName("spam")
val sc = new SparkContext(conf)
val spam = sc.textFile("spam.txt")
val norm = sc.textFile("normal.txt")
val tf = new HashingTF(10000)
val spamFeatures = spam.map(email => tf.transform(email.split(" ")))
val normFeatures = norm.map(email => tf.transform(email.split(" ")))
val posExamples = spamFeatures.map(f => LabeledPoint(1, f))
val negExamples = normFeatures.map(f => LabeledPoint(0, f))
val trainingData = posExamples.union(negExamples)
trainingData.cache()
val model = new LogisticRegressionWithSGD().run(trainingData)
val posTest = tf.transform("Subject: Cheap Stuff From: <omg.fu> O M G GET cheap stuff by sending money to Robin Hillyard".split(" "))
val negTest = tf.transform("Subject: Spark From: Robin Hillyard<[email protected]> Hi Adam, I started studying Spark the other day".split(" "))
println(s"Prediction for positive test example: ${model.predict(posTest)}")
println(s"Prediction for negative test example: ${model.predict(negTest)}")
}
开发者ID:menezesl,项目名称:Scala-repo,代码行数:34,代码来源:Classifier.scala
示例14: sparkConfs
//设置package包名称以及导入依赖的类
package org.hammerlab.spark
import org.apache.spark.SparkConf
import scala.collection.mutable
trait SparkConfBase {
private val _sparkConfs = mutable.Map[String, String]()
protected def sparkConfs: Map[String, String] = _sparkConfs.toMap
protected def makeSparkConf: SparkConf = {
val sparkConf = new SparkConf()
for {
(k, v) ? _sparkConfs
} {
sparkConf.setIfMissing(k, v)
}
sparkConf
}
protected def sparkConf(confs: (String, String)*): Unit =
for {
(k, v) ? confs
} {
_sparkConfs(k) = v
}
}
开发者ID:hammerlab,项目名称:spark-util,代码行数:29,代码来源:SparkConfBase.scala
示例15: Context
//设置package包名称以及导入依赖的类
package org.hammerlab.spark
import org.apache.spark.{ SparkConf, SparkContext }
import org.hammerlab.hadoop.Configuration
case class Context(@transient sc: SparkContext)
extends Configuration(sc.hadoopConfiguration)
object Context {
implicit def makeContext(sc: SparkContext): Context = Context(sc)
implicit def umakeContext(context: Context): SparkContext = context.sc
def apply()(implicit conf: SparkConf): Context =
Context(
new SparkContext(
conf
)
)
}
开发者ID:hammerlab,项目名称:spark-util,代码行数:20,代码来源:Context.scala
示例16: Cogroup
//设置package包名称以及导入依赖的类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf,SparkContext}
object Cogroup{
def main(args:Array[String]){
val conf = new SparkConf().setAppName("RDD Aggregate").setMaster("local")
val sc = new SparkContext(conf)
val citi = sc.textFile("./citi")
val hdfc = sc.textFile("./hdfc")
val sbi = sc.textFile("./sbi")
val citiPairRDD = citi.map(row => (row.split("\t")(0), row.split("\t")(1).toInt))
val hdfcPairRDD = hdfc.map(row => (row.split("\t")(0), row.split("\t")(1).toInt))
val sbiPairRDD = sbi.map(row => (row.split("\t")(0), row.split("\t")(1).toInt))
val groupRDD = citiPairRDD.cogroup(hdfcPairRDD, sbiPairRDD)
groupRDD.collect.foreach{println}
}
}
开发者ID:malli3131,项目名称:SparkApps,代码行数:18,代码来源:Cogroup.scala
示例17: getFpModelLift
//设置package包名称以及导入依赖的类
package ch02
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd._
import org.apache.spark.mllib.fpm._
val modelwlift = getFpModelLift(sc, txCnt, model)
}
def getFpModelLift(sc:SparkContext, txCnt:Long, fpModel:FPGrowthModel[String])
: RDD[(Double, (Array[String], Long, Long, Double, Double, Double))] = {
// ????item?????
val fpItMap = fpModel.freqItemsets.filter{
itset => itset.items.length == 1
}.map{
itset => (itset.items(0), itset.freq.toDouble/txCnt.toDouble)
}.collectAsMap()
val fpItBc = sc.broadcast(fpItMap)
// ??freqitemset?lift
val fpmodelwLift = fpModel.freqItemsets.mapPartitions{ ite =>
val fpItMap = fpItBc.value
ite.filter{
itset => itset.items.length > 1
}.map{ itset =>
val indProb = itset.items.map{
item => fpItMap.getOrElse(item, 1.0D)
}.toList.reduce( (a,b) => a * b )
val itsetProb = itset.freq.toDouble/txCnt.toDouble
( itset.items, itset.freq, txCnt, indProb, itsetProb, itsetProb/indProb )
}
// ??lift???
}.map{
case ( items, freq, txCnt, indProb, itsetProb, lift ) => (lift, ( items, freq, txCnt, indProb, itsetProb, lift ))
}.sortByKey(false)
fpmodelwLift
}
def main(args: Array[String]) {
val appName = "Wifi Analysis"
val sparkConf = new SparkConf().setAppName(appName)
val sc = new SparkContext(sparkConf)
}
}
开发者ID:leoricklin,项目名称:spark-adv,代码行数:45,代码来源:RecSys.scala
示例18: SAMAddRecordGroupDictionary
//设置package包名称以及导入依赖的类
package com.github.xubo245.gcdss.ETL
import org.apache.spark.{SparkConf, SparkContext}
import org.bdgenomics.adam.models.{RecordGroupDictionary, RecordGroup}
import org.bdgenomics.adam.rdd.ADAMContext
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD
import org.bdgenomics.adam.rdd.variant.VariantRDD
object SAMAddRecordGroupDictionary {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SAMAddRecordGroupDictionary")
if (System.getProperties.getProperty("os.name").contains("Windows")) {
conf.setMaster("local[16]")
}
val sc = new SparkContext(conf)
val ac = new ADAMContext(sc)
val alignment = sc.loadAlignments(args(0))
//make read group dictionary
// val readGroup = new RecordGroup("sample", "sample")
var recordGroupName = "machine foo"
var recordGroupSample = "sample"
val readGroup = new RecordGroup(recordGroupSample, recordGroupName, library = Some("library bar"))
val readGroups = new RecordGroupDictionary(Seq(readGroup))
var RDD = alignment.rdd.map { each =>
each.setRecordGroupName(recordGroupName)
each.setRecordGroupSample(recordGroupSample)
each
}
// VariantRDD(variantsChr, alignment.sequences, variants.headerLines).saveAsParquet(args(2))
AlignmentRecordRDD(RDD, alignment.sequences, readGroups).saveAsParquet(args(1))
sc.stop()
}
}
开发者ID:xubo245,项目名称:GCDSS,代码行数:36,代码来源:SAMAddRecordGroupDictionary.scala
示例19: LoadVCF
//设置package包名称以及导入依赖的类
package com.github.xubo245.gcdss.load
import htsjdk.samtools.util.Log
import org.apache.spark.{SparkContext, SparkConf}
import org.bdgenomics.adam.rdd.ADAMContext
import org.bdgenomics.adam.rdd.ADAMContext._
object LoadVCF {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("LoadVCF").setMaster("local[16]")
Log.setGlobalLogLevel(Log.LogLevel.ERROR)
// .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
val ac = new ADAMContext(sc)
// val vcfFile="file\\callVariant\\input\\vcf\\All_20160407L1000000.vcf"
// val vcfFile="file\\callVariant\\input\\vcf\\vcfSelect.vcf20170405220837562"
// val vcfFile = "file\\callVariant\\input\\vcf\\vcfSelect.vcf20170405221247894"
// val vcfFile = "hdfs://219.219.220.149:9000/xubo/callVariant/vcf/vcfSelect.vcf20170406035127942"
val vcfFile=args(0)
compute(sc,vcfFile)
sc.stop()
}
def compute(sc:SparkContext,vcfFile:String): Unit ={
val vcfRDD = sc.loadVariants(vcfFile)
println("vcfRDD.rdd.count():" + vcfRDD.rdd.count())
vcfRDD.rdd.take(10).foreach(println)
println("headerLines:")
vcfRDD.headerLines.foreach(println)
println("records:")
vcfRDD.sequences.records.foreach(println)
var record=vcfRDD.sequences.records
println(record.size)
var oneRDD = vcfRDD.rdd.filter { each =>
each.getAlternateAllele.length == 1
}
var twoRDD = vcfRDD.rdd.filter { each =>
each.getAlternateAllele.length == 2
}
var threeRDD = vcfRDD.rdd.filter { each =>
each.getAlternateAllele.length == 3
}
println("******genotype:"+vcfRDD.rdd.count())
vcfRDD.rdd.take(20).foreach(println)
println("******oneRDD:"+oneRDD.count())
println("******twoRDD:"+twoRDD.count())
twoRDD.take(20).foreach(println)
println("******threeRDD:"+threeRDD.count())
threeRDD.take(20).foreach(println)
}
}
开发者ID:xubo245,项目名称:GCDSS,代码行数:55,代码来源:LoadVCF.scala
示例20: Main
//设置package包名称以及导入依赖的类
import Fqueue.{FqueueReceiver, FqueueSender}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Main {
private def sendData() = {
val fqueuSender = new FqueueSender("localhost:18740", 4, 4000)
fqueuSender.connect()
while (true) {
val ret = fqueuSender.enQueue("track_BOdao2015*", "123")
Thread.sleep(1000)
}
fqueuSender.stop()
}
private def getData() = {
val fqueueReceiver = new FqueueReceiver("localhost:18740", 4, 4000)
fqueueReceiver.connect()
val data = fqueueReceiver.deQueue("track_BOdao2015*")
println(data.getOrElse("null"))
fqueueReceiver.stop()
}
def main(args: Array[String]) {
new Thread("fqueue sender") {
override def run() { sendData() }
}.start()
val config = new SparkConf().setAppName("testfqueue").setMaster("local[2]")
val ssc = new StreamingContext(config, Seconds(5))
val lines = ssc.receiverStream(new FqueueStreamingReceiver("localhost:18740", 4, 4000))
lines.print()
ssc.start()
ssc.awaitTermination()
}
}
开发者ID:TopSpoofer,项目名称:FqueueStreamingReceiver,代码行数:38,代码来源:Main.scala
注:本文中的org.apache.spark.SparkConf类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论