本文整理汇总了Scala中org.apache.hadoop.mapreduce.lib.input.TextInputFormat类的典型用法代码示例。如果您正苦于以下问题:Scala TextInputFormat类的具体用法?Scala TextInputFormat怎么用?Scala TextInputFormat使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了TextInputFormat类的5个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: RedditVariationApp
//设置package包名称以及导入依赖的类
package org.apress.prospark
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{ Milliseconds, Seconds, StreamingContext }
import org.apache.hadoop.io.{ Text, LongWritable, IntWritable }
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.streaming.dstream.DStream
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{ TextOutputFormat => NewTextOutputFormat }
import org.apache.spark.streaming.dstream.PairDStreamFunctions
import org.apache.log4j.LogManager
import org.json4s._
import org.json4s.native.JsonMethods._
import java.text.SimpleDateFormat
import java.util.Date
object RedditVariationApp {
def main(args: Array[String]) {
if (args.length != 2) {
System.err.println(
"Usage: RedditVariationApp <appname> <input_path>")
System.exit(1)
}
val Seq(appName, inputPath) = args.toSeq
val LOG = LogManager.getLogger(this.getClass)
val conf = new SparkConf()
.setAppName(appName)
.setJars(SparkContext.jarOfClass(this.getClass).toSeq)
val ssc = new StreamingContext(conf, Seconds(1))
LOG.info("Started at %d".format(ssc.sparkContext.startTime))
val comments = ssc.fileStream[LongWritable, Text, TextInputFormat](inputPath, (f: Path) => true, newFilesOnly = false).map(pair => pair._2.toString)
val merged = comments.union(comments)
val repartitionedComments = comments.repartition(4)
val rddMin = comments.glom().map(arr =>
arr.minBy(rec => ((parse(rec) \ "created_utc").values.toString.toInt)))
ssc.start()
ssc.awaitTermination()
}
}
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:50,代码来源:L3-DStreamVariation.scala
示例2: ADAMContextExtensions
//设置package包名称以及导入依赖的类
package org.bdgenomics.adam.rdd
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkContext
import org.apache.spark.api.java.StorageLevels
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.converters.FastaConverter
import org.bdgenomics.adam.rdd.contig.NucleotideContigFragmentRDD
import org.bdgenomics.utils.instrumentation.Metrics
import org.apache.spark.rdd.MetricsContext._
import org.bdgenomics.adam.rdd.feature.FeatureRDD
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.formats.avro.Feature
object ADAMContextExtensions {
implicit class spExt(val sparkContext: SparkContext) extends HDFSFilesExtensions{
def loadFastaPersistent(
filePath: String,
fragmentLength: Long = 10000L): NucleotideContigFragmentRDD = {
val fastaData: RDD[(LongWritable, Text)] = sparkContext.newAPIHadoopFile(
filePath,
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text]
)
if (Metrics.isRecording) fastaData.instrument() else fastaData
val remapData = fastaData.map(kv => (kv._1.get, kv._2.toString))
// convert rdd and cache
val fragmentRdd = FastaConverter(remapData, fragmentLength)
.persist(StorageLevels.MEMORY_AND_DISK)
NucleotideContigFragmentRDD(fragmentRdd)
}
def mergeFeatures(features: List[FeatureRDD]): Option[FeatureRDD] = features match {
case Nil => None
case head :: Nil => Some(head)
case head :: tail =>
val merged = tail.foldLeft(head){
case (acc, feature) =>
val joined = acc.broadcastRegionJoin(feature)
acc.transform(_ => joined.rdd.map{
case (one, two) =>
one.setStart(Math.min(one.getStart, two.getStart))
one.setEnd(Math.max(one.getEnd, two.getEnd))
one
})
}
Some(merged)
}
}
}
开发者ID:antonkulaga,项目名称:adam-playground,代码行数:62,代码来源:ADAMContextExtensions.scala
示例3: VoyagerApp
//设置package包名称以及导入依赖的类
package org.apress.prospark
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
object VoyagerApp {
def main(args: Array[String]) {
if (args.length != 3) {
System.err.println(
"Usage: VoyagerApp <appname> <inputPath> <outputPath>")
System.exit(1)
}
val Seq(appName, inputPath, outputPath) = args.toSeq
val conf = new SparkConf()
.setAppName(appName)
.setJars(SparkContext.jarOfClass(this.getClass).toSeq)
.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC")
val ssc = new StreamingContext(conf, Seconds(10))
val voyager1 = ssc.fileStream[LongWritable, Text, TextInputFormat](inputPath, (f: Path) => true, newFilesOnly = false).map(pair => pair._2.toString)
voyager1.map(rec => {
val attrs = rec.split("\\s+")
((attrs(0).toInt), attrs.slice(18, 28).map(_.toDouble))
}).filter(pflux => pflux._2.exists(_ > 1.0)).map(rec => (rec._1, 1))
.reduceByKey(_ + _)
.transform(rec => rec.sortByKey(ascending = false, numPartitions = 1)).saveAsTextFiles(outputPath)
ssc.start()
ssc.awaitTermination()
}
}
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:42,代码来源:L4-1Voyager.scala
示例4: VoyagerAppKryo
//设置package包名称以及导入依赖的类
package org.apress.prospark
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
object VoyagerAppKryo {
def main(args: Array[String]) {
if (args.length != 3) {
System.err.println(
"Usage: VoyagerAppKryo <appname> <inputPath> <outputPath>")
System.exit(1)
}
val Seq(appName, inputPath, outputPath) = args.toSeq
val conf = new SparkConf()
.setAppName(appName)
.setJars(SparkContext.jarOfClass(this.getClass).toSeq)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[ProtonFlux]))
val ssc = new StreamingContext(conf, Seconds(10))
val voyager1 = ssc.fileStream[LongWritable, Text, TextInputFormat](inputPath, (f: Path) => true, newFilesOnly = false).map(pair => pair._2.toString)
val projected = voyager1.map(rec => {
val attrs = rec.split("\\s+")
new ProtonFlux(attrs(0), attrs(18), attrs(19), attrs(20), attrs(21),
attrs(22), attrs(23), attrs(24), attrs(25), attrs(26), attrs(27),
attrs(28))
})
val filtered = projected.filter(pflux => pflux.isSolarStorm)
val yearlyBreakdown = filtered.map(rec => (rec.year, 1))
.reduceByKey(_ + _)
.transform(rec => rec.sortByKey(ascending = false))
yearlyBreakdown.saveAsTextFiles(outputPath)
ssc.start()
ssc.awaitTermination()
}
}
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:48,代码来源:L4-4Kryo.scala
示例5: RedditAggregationApp
//设置package包名称以及导入依赖的类
package org.apress.prospark
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{ Milliseconds, Seconds, StreamingContext }
import org.apache.hadoop.io.{ Text, LongWritable, IntWritable }
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.streaming.dstream.DStream
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{ TextOutputFormat => NewTextOutputFormat }
import org.apache.spark.streaming.dstream.PairDStreamFunctions
import org.apache.log4j.LogManager
import org.json4s._
import org.json4s.native.JsonMethods._
import java.text.SimpleDateFormat
import java.util.Date
object RedditAggregationApp {
def main(args: Array[String]) {
if (args.length != 2) {
System.err.println(
"Usage: RedditAggregationApp <appname> <input_path>")
System.exit(1)
}
val Seq(appName, inputPath) = args.toSeq
val LOG = LogManager.getLogger(this.getClass)
val conf = new SparkConf()
.setAppName(appName)
.setJars(SparkContext.jarOfClass(this.getClass).toSeq)
val ssc = new StreamingContext(conf, Seconds(1))
LOG.info("Started at %d".format(ssc.sparkContext.startTime))
val comments = ssc.fileStream[LongWritable, Text, TextInputFormat](inputPath, (f: Path) => true, newFilesOnly = false).map(pair => pair._2.toString)
val recCount = comments.count()
val recCountValue = comments.countByValue()
val totalWords = comments.map(rec => ((parse(rec) \ "body").values.toString))
.flatMap(body => body.split(" "))
.map(word => 1)
.reduce(_ + _)
ssc.start()
ssc.awaitTermination()
}
}
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:52,代码来源:L3-DStreamAggregation.scala
注:本文中的org.apache.hadoop.mapreduce.lib.input.TextInputFormat类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论