本文整理汇总了Scala中org.apache.hadoop.io.Text类的典型用法代码示例。如果您正苦于以下问题:Scala Text类的具体用法?Scala Text怎么用?Scala Text使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Text类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: FPMiningPreprocessingApp
//设置package包名称以及导入依赖的类
package org.apress.prospark
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.FileSplit
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.HadoopRDD
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import com.google.common.io.Files
object FPMiningPreprocessingApp {
def main(args: Array[String]) {
if (args.length != 3) {
System.err.println(
"Usage: FPMiningPreprocessingApp <appname> <inputpath> <outputpath>")
System.exit(1)
}
val Seq(appName, iPath, oPath) = args.toSeq
val conf = new SparkConf()
.setAppName(appName)
.setJars(SparkContext.jarOfClass(this.getClass).toSeq)
val delim = " "
val sc = new SparkContext(conf)
sc.hadoopFile(iPath, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], sc.defaultMinPartitions)
.asInstanceOf[HadoopRDD[LongWritable, Text]]
.mapPartitionsWithInputSplit((iSplit, iter) =>
iter.map(splitAndLine => (Files.getNameWithoutExtension(iSplit.asInstanceOf[FileSplit].getPath.toString), splitAndLine._2.toString.split(" ")(1))))
.filter(r => r._2 != "0")
.map(r => (r._1, r._2))
.distinct()
.groupByKey()
.map(r => r._2.mkString(" "))
.sample(false, 0.7)
.coalesce(1)
.saveAsTextFile(oPath)
}
}
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:45,代码来源:L9-13FPMiningPreprocessing.scala
示例2: RedditVariationApp
//设置package包名称以及导入依赖的类
package org.apress.prospark
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{ Milliseconds, Seconds, StreamingContext }
import org.apache.hadoop.io.{ Text, LongWritable, IntWritable }
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.streaming.dstream.DStream
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{ TextOutputFormat => NewTextOutputFormat }
import org.apache.spark.streaming.dstream.PairDStreamFunctions
import org.apache.log4j.LogManager
import org.json4s._
import org.json4s.native.JsonMethods._
import java.text.SimpleDateFormat
import java.util.Date
object RedditVariationApp {
def main(args: Array[String]) {
if (args.length != 2) {
System.err.println(
"Usage: RedditVariationApp <appname> <input_path>")
System.exit(1)
}
val Seq(appName, inputPath) = args.toSeq
val LOG = LogManager.getLogger(this.getClass)
val conf = new SparkConf()
.setAppName(appName)
.setJars(SparkContext.jarOfClass(this.getClass).toSeq)
val ssc = new StreamingContext(conf, Seconds(1))
LOG.info("Started at %d".format(ssc.sparkContext.startTime))
val comments = ssc.fileStream[LongWritable, Text, TextInputFormat](inputPath, (f: Path) => true, newFilesOnly = false).map(pair => pair._2.toString)
val merged = comments.union(comments)
val repartitionedComments = comments.repartition(4)
val rddMin = comments.glom().map(arr =>
arr.minBy(rec => ((parse(rec) \ "created_utc").values.toString.toInt)))
ssc.start()
ssc.awaitTermination()
}
}
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:50,代码来源:L3-DStreamVariation.scala
示例3: FrequencyMapReducer
//设置package包名称以及导入依赖的类
package com.argcv.cse8803.mapreducebasic
import com.argcv.valhalla.console.ColorForConsole._
import com.argcv.valhalla.utils.Awakable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
object FrequencyMapReducer extends Awakable {
def main(args: Array[String]): Unit = {
// create a hadoop job and set main class
val job = Job.getInstance()
job.setJarByClass(FrequencyMapReducer.getClass)
job.setJobName("Frequency")
// set the input & output path
FileInputFormat.addInputPath(job, new Path(args.head))
FileOutputFormat.setOutputPath(job, new Path(s"${args(1)}-${System.currentTimeMillis()}"))
// set mapper & reducer
job.setMapperClass(FrequencyMapper.instance)
job.setReducerClass(FrequencyReducer.instance)
// specify the type of the output
job.setOutputKeyClass(new Text().getClass)
job.setOutputValueClass(new IntWritable().getClass)
// run
logger.info(s"job finished, status [${if (job.waitForCompletion(true)) "OK".withColor(GREEN) else "FAILED".withColor(RED)}]")
}
}
开发者ID:yuikns,项目名称:cse8803,代码行数:37,代码来源:FrequencyMapReducer.scala
示例4: FunctionalSyntaxOWLExpressionsDataSetBuilder
//设置package包名称以及导入依赖的类
package net.sansa_stack.owl.flink.dataset
import net.sansa_stack.owl.common.parsing.{FunctionalSyntaxExpressionBuilder, FunctionalSyntaxPrefixParsing}
import net.sansa_stack.owl.flink.hadoop.FunctionalSyntaxInputFormat
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.hadoop.io.{LongWritable, Text}
object FunctionalSyntaxOWLExpressionsDataSetBuilder extends FunctionalSyntaxPrefixParsing {
def build(env: ExecutionEnvironment, filePath: String): OWLExpressionsDataSet = {
import org.apache.flink.api.scala._
val hadoopDataSet: DataSet[(LongWritable, Text)] =
env.readHadoopFile[LongWritable, Text](
new FunctionalSyntaxInputFormat,
classOf[LongWritable],
classOf[Text],
filePath
)
val rawDataSet = hadoopDataSet.map(_._2.toString)
val tmp: Seq[(String, String)] = rawDataSet.filter(isPrefixDeclaration(_)).map(parsePrefix(_)).collect()
val prefixes: Map[String, String] = tmp.toMap
val builder = new FunctionalSyntaxExpressionBuilder(prefixes)
rawDataSet.map(builder.clean(_)).filter(_ != null)
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-OWL,代码行数:30,代码来源:FunctionalSyntaxOWLExpressionsDataSetBuilder.scala
示例5: ManchesterSyntaxOWLExpressionsDataSetBuilder
//设置package包名称以及导入依赖的类
package net.sansa_stack.owl.flink.dataset
import net.sansa_stack.owl.common.parsing.{ManchesterSyntaxExpressionBuilder, ManchesterSyntaxPrefixParsing}
import net.sansa_stack.owl.flink.hadoop.ManchesterSyntaxInputFormat
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.hadoop.io.{LongWritable, Text}
object ManchesterSyntaxOWLExpressionsDataSetBuilder extends ManchesterSyntaxPrefixParsing {
def build(env: ExecutionEnvironment, filePath: String): OWLExpressionsDataSet = {
buildAndGetPrefixes(env, filePath)._1
}
private[dataset] def buildAndGetPrefixes(env: ExecutionEnvironment,
filePath: String): (OWLExpressionsDataSet, Map[String, String]) = {
import org.apache.flink.api.scala._
val hadoopDataSet: DataSet[(LongWritable, Text)] =
env.readHadoopFile[LongWritable, Text](
new ManchesterSyntaxInputFormat,
classOf[LongWritable],
classOf[Text],
filePath
)
val rawDataSet = hadoopDataSet.map(_._2.toString)
val tmp: Seq[(String, String)] = rawDataSet.filter(isPrefixDeclaration(_)).map(parsePrefix(_)).collect()
val prefixes: Map[String, String] = tmp.toMap
val builder = new ManchesterSyntaxExpressionBuilder(prefixes)
(rawDataSet.map(builder.clean(_)).filter(_ != null), prefixes)
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-OWL,代码行数:35,代码来源:ManchesterSyntaxOWLExpressionsDataSetBuilder.scala
示例6: ManchesterSyntaxOWLExpressionsRDDBuilder
//设置package包名称以及导入依赖的类
package net.sansa_stack.owl.spark.rdd
import net.sansa_stack.owl.common.parsing.{ManchesterSyntaxExpressionBuilder, ManchesterSyntaxPrefixParsing}
import net.sansa_stack.owl.spark.hadoop.ManchesterSyntaxInputFormat
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.spark.SparkContext
object ManchesterSyntaxOWLExpressionsRDDBuilder extends ManchesterSyntaxPrefixParsing {
def build(sc: SparkContext, filePath: String): OWLExpressionsRDD = {
buildAndGetPrefixes(sc, filePath)._1
}
private[spark] def buildAndGetPrefixes(sc: SparkContext, filePath: String): (OWLExpressionsRDD, Map[String, String]) = {
val rawRDD = sc.hadoopFile(
filePath,
classOf[ManchesterSyntaxInputFormat],
classOf[LongWritable],
classOf[Text],
sc.defaultMinPartitions).map(_._2.toString)
val tmp: Array[(String, String)] =
rawRDD.filter(isPrefixDeclaration(_)).map(parsePrefix).collect()
val prefixes: Map[String, String] = tmp.toMap
val builder = new ManchesterSyntaxExpressionBuilder(prefixes)
(rawRDD.map(builder.clean(_)).filter(_ != null), prefixes)
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-OWL,代码行数:30,代码来源:ManchesterSyntaxOWLExpressionsRDDBuilder.scala
示例7: FunctionalSyntaxOWLExpressionsRDDBuilder
//设置package包名称以及导入依赖的类
package net.sansa_stack.owl.spark.rdd
import net.sansa_stack.owl.common.parsing.{FunctionalSyntaxExpressionBuilder, FunctionalSyntaxPrefixParsing}
import net.sansa_stack.owl.spark.hadoop.FunctionalSyntaxInputFormat
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.spark.SparkContext
object FunctionalSyntaxOWLExpressionsRDDBuilder extends Serializable with FunctionalSyntaxPrefixParsing {
def build(sc: SparkContext, filePath: String): OWLExpressionsRDD = {
val hadoopRDD = sc.hadoopFile(
filePath, classOf[FunctionalSyntaxInputFormat], classOf[LongWritable],
classOf[Text], sc.defaultMinPartitions)
val rawRDD = hadoopRDD.map(entry => entry._2.toString)
val tmp: Array[(String, String)] =
rawRDD.filter(isPrefixDeclaration(_)).map(parsePrefix).collect()
val prefixes: Map[String, String] = tmp.toMap
val builder = new FunctionalSyntaxExpressionBuilder(prefixes)
rawRDD.map(builder.clean(_)).filter(_ != null)
}
}
开发者ID:SANSA-Stack,项目名称:SANSA-OWL,代码行数:26,代码来源:FunctionalSyntaxOWLExpressionsRDDBuilder.scala
示例8: Converter
//设置package包名称以及导入依赖的类
package com.dataoptimo.imgprocessing.convert
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.io.IOUtils
import org.apache.hadoop.io.SequenceFile
import java.io.IOException
import java.lang.IllegalArgumentException
class Converter(conf: Configuration) {
def imageToSequence(srcPath: String, dstPath: String){
try {
val fs = FileSystem.get(conf);
val inPath = new Path(srcPath);
val outPath = new Path(dstPath);
val key = new Text();
val value = new BytesWritable();
val in = fs.open(inPath);
val buffer = new Array[Byte](in.available())
in.read(buffer);
var writer = SequenceFile.createWriter(fs, conf, outPath, key.getClass(),value.getClass());
writer.append(new Text(inPath.getName()), new BytesWritable(buffer));
IOUtils.closeStream(writer);
}
catch {
case io: IOException => println(io.getMessage)
case illegalArgument: IllegalArgumentException => println(illegalArgument.getMessage)
}
}
}
开发者ID:mfawadalam,项目名称:imgprocessing,代码行数:36,代码来源:Converters.scala
示例9: ADAMContextExtensions
//设置package包名称以及导入依赖的类
package org.bdgenomics.adam.rdd
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkContext
import org.apache.spark.api.java.StorageLevels
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.converters.FastaConverter
import org.bdgenomics.adam.rdd.contig.NucleotideContigFragmentRDD
import org.bdgenomics.utils.instrumentation.Metrics
import org.apache.spark.rdd.MetricsContext._
import org.bdgenomics.adam.rdd.feature.FeatureRDD
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.formats.avro.Feature
object ADAMContextExtensions {
implicit class spExt(val sparkContext: SparkContext) extends HDFSFilesExtensions{
def loadFastaPersistent(
filePath: String,
fragmentLength: Long = 10000L): NucleotideContigFragmentRDD = {
val fastaData: RDD[(LongWritable, Text)] = sparkContext.newAPIHadoopFile(
filePath,
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text]
)
if (Metrics.isRecording) fastaData.instrument() else fastaData
val remapData = fastaData.map(kv => (kv._1.get, kv._2.toString))
// convert rdd and cache
val fragmentRdd = FastaConverter(remapData, fragmentLength)
.persist(StorageLevels.MEMORY_AND_DISK)
NucleotideContigFragmentRDD(fragmentRdd)
}
def mergeFeatures(features: List[FeatureRDD]): Option[FeatureRDD] = features match {
case Nil => None
case head :: Nil => Some(head)
case head :: tail =>
val merged = tail.foldLeft(head){
case (acc, feature) =>
val joined = acc.broadcastRegionJoin(feature)
acc.transform(_ => joined.rdd.map{
case (one, two) =>
one.setStart(Math.min(one.getStart, two.getStart))
one.setEnd(Math.max(one.getEnd, two.getEnd))
one
})
}
Some(merged)
}
}
}
开发者ID:antonkulaga,项目名称:adam-playground,代码行数:62,代码来源:ADAMContextExtensions.scala
示例10:
//设置package包名称以及导入依赖的类
package epam.idobrovolskiy.wikipedia
import epam.idobrovolskiy.wikipedia.trending.time.PlainDatesExtractor
import epam.idobrovolskiy.wikipedia.trending.tokenizer.StopWordsTokenizer
import org.apache.hadoop.io.{IntWritable, Text}
package object trending extends scala.AnyRef {
val AppName = "wikipedia-trending"
val DefaultTokenizer = new StopWordsTokenizer
val TopTokenCount = 10
val DefaultInputWikiDumpFilename = "wiki_small"
val DefaultPrepHeaderFilename = "wiki_prep_headers"
val DefaultPrepFullFilename = "wiki_prep_full"
val DefaultDateCitationsFileName = "wiki_date_citations"
val DefaultDateIndexFileName = "wiki_index_dates"
val DefaultDocIndexFileName = "wiki_index_docs"
val DefaultTarget = preprocessing.PreprocessingTarget.Stdout
val DefaultPathForPlainTextExtraction = "./data/out"
val DefaultWikipediaDumpFilesPath = "./data/in"
val DefaultPlainTextExtractor = preprocessing.attardi.AttardiPlainTextExtractor
val HdfsNameNodeHost = "hdfs://sandbox.hortonworks.com:8020"
val HdfsRootPath = "/user/idobrovolskiy/wikipedia-trending/"
val PreprocessedFileHeaderBodyDelimiter = "\n\n"
type PreprocessedSequenceFileKeyType = IntWritable
type PreprocessedSequenceFileValueType = Text
val DefaultDatesExtractor = new PlainDatesExtractor
lazy val spark = common.SparkUtils.sparkSession
}
开发者ID:igor-dobrovolskiy-epam,项目名称:wikipedia-analysis-scala-core,代码行数:36,代码来源:package.scala
示例11: WikiCounter
//设置package包名称以及导入依赖的类
package main
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.hadoop.streaming.StreamXmlRecordReader
import org.apache.hadoop.util.StringUtils
import org.apache.spark.{SparkContext, SparkConf}
import scala.xml.XML
object WikiCounter {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Word Counter")
val sc = new SparkContext(conf)
val jobConf = new JobConf()
jobConf.set("stream.recordreader.class",
"org.apache.hadoop.streaming.StreamXmlRecordReader")
jobConf.set("stream.recordreader.begin", "<page")
jobConf.set("stream.recordreader.end", "</page>")
FileInputFormat.addInputPaths(jobConf,
"file:///Users/di/Books/enwiki-20160204-pages-articles1.xml-p000000010p000030302")
val wikiDocuments = sc.hadoopRDD(jobConf,
classOf[org.apache.hadoop.streaming.StreamInputFormat],
classOf[Text], classOf[Text])
.map(_._1.toString)
val rawWikiPages = wikiDocuments.map(wikiString => {
val wikiXml = XML.loadString(wikiString)
(wikiXml \ "revision" \ "text").text
})
val tokenizedWikiData = rawWikiPages
.map(_.replaceAll("[.|,|'|\"|?|)|(|_|0-9]", " ").trim)
.flatMap(_.split("\\W+"))
.filter(_.length > 2)
val sortedByLength = tokenizedWikiData.distinct
.sortBy(_.length, ascending = false)
.sample(withReplacement = false, fraction = 0.1)
sortedByLength.saveAsTextFile("/tmp/wiki_pages")
}
}
开发者ID:denyago,项目名称:spark-wordcount,代码行数:46,代码来源:WikiCounter.scala
示例12: VoyagerApp
//设置package包名称以及导入依赖的类
package org.apress.prospark
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
object VoyagerApp {
def main(args: Array[String]) {
if (args.length != 3) {
System.err.println(
"Usage: VoyagerApp <appname> <inputPath> <outputPath>")
System.exit(1)
}
val Seq(appName, inputPath, outputPath) = args.toSeq
val conf = new SparkConf()
.setAppName(appName)
.setJars(SparkContext.jarOfClass(this.getClass).toSeq)
.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC")
val ssc = new StreamingContext(conf, Seconds(10))
val voyager1 = ssc.fileStream[LongWritable, Text, TextInputFormat](inputPath, (f: Path) => true, newFilesOnly = false).map(pair => pair._2.toString)
voyager1.map(rec => {
val attrs = rec.split("\\s+")
((attrs(0).toInt), attrs.slice(18, 28).map(_.toDouble))
}).filter(pflux => pflux._2.exists(_ > 1.0)).map(rec => (rec._1, 1))
.reduceByKey(_ + _)
.transform(rec => rec.sortByKey(ascending = false, numPartitions = 1)).saveAsTextFiles(outputPath)
ssc.start()
ssc.awaitTermination()
}
}
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:42,代码来源:L4-1Voyager.scala
示例13: VoyagerAppKryo
//设置package包名称以及导入依赖的类
package org.apress.prospark
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
object VoyagerAppKryo {
def main(args: Array[String]) {
if (args.length != 3) {
System.err.println(
"Usage: VoyagerAppKryo <appname> <inputPath> <outputPath>")
System.exit(1)
}
val Seq(appName, inputPath, outputPath) = args.toSeq
val conf = new SparkConf()
.setAppName(appName)
.setJars(SparkContext.jarOfClass(this.getClass).toSeq)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[ProtonFlux]))
val ssc = new StreamingContext(conf, Seconds(10))
val voyager1 = ssc.fileStream[LongWritable, Text, TextInputFormat](inputPath, (f: Path) => true, newFilesOnly = false).map(pair => pair._2.toString)
val projected = voyager1.map(rec => {
val attrs = rec.split("\\s+")
new ProtonFlux(attrs(0), attrs(18), attrs(19), attrs(20), attrs(21),
attrs(22), attrs(23), attrs(24), attrs(25), attrs(26), attrs(27),
attrs(28))
})
val filtered = projected.filter(pflux => pflux.isSolarStorm)
val yearlyBreakdown = filtered.map(rec => (rec.year, 1))
.reduceByKey(_ + _)
.transform(rec => rec.sortByKey(ascending = false))
yearlyBreakdown.saveAsTextFiles(outputPath)
ssc.start()
ssc.awaitTermination()
}
}
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:48,代码来源:L4-4Kryo.scala
示例14: CollabFilteringPreprocessingApp
//设置package包名称以及导入依赖的类
package org.apress.prospark
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.FileSplit
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.HadoopRDD
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import com.google.common.io.Files
object CollabFilteringPreprocessingApp {
def main(args: Array[String]) {
if (args.length != 3) {
System.err.println(
"Usage: CollabFilteringPreprocessingApp <appname> <inputpath> <outputpath>")
System.exit(1)
}
val Seq(appName, iPath, oPath) = args.toSeq
val conf = new SparkConf()
.setAppName(appName)
.setJars(SparkContext.jarOfClass(this.getClass).toSeq)
val delim = " "
val sc = new SparkContext(conf)
sc.hadoopFile(iPath, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], sc.defaultMinPartitions)
.asInstanceOf[HadoopRDD[LongWritable, Text]]
.mapPartitionsWithInputSplit((iSplit, iter) =>
iter.map(splitAndLine => (Files.getNameWithoutExtension(iSplit.asInstanceOf[FileSplit].getPath.toString), splitAndLine._2.toString.split(" ")(1))))
.filter(r => r._2 != "0")
.map(r => ((r._1, r._2), 1))
.reduceByKey(_ + _)
.map(r => r._1._1.replace("subject", "") + delim + r._1._2 + delim + r._2)
.sample(false, 0.7)
.coalesce(1)
.saveAsTextFile(oPath)
}
}
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:44,代码来源:L9-11CollabFilteringPreprocessing.scala
示例15: RedditAggregationApp
//设置package包名称以及导入依赖的类
package org.apress.prospark
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{ Milliseconds, Seconds, StreamingContext }
import org.apache.hadoop.io.{ Text, LongWritable, IntWritable }
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.streaming.dstream.DStream
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{ TextOutputFormat => NewTextOutputFormat }
import org.apache.spark.streaming.dstream.PairDStreamFunctions
import org.apache.log4j.LogManager
import org.json4s._
import org.json4s.native.JsonMethods._
import java.text.SimpleDateFormat
import java.util.Date
object RedditAggregationApp {
def main(args: Array[String]) {
if (args.length != 2) {
System.err.println(
"Usage: RedditAggregationApp <appname> <input_path>")
System.exit(1)
}
val Seq(appName, inputPath) = args.toSeq
val LOG = LogManager.getLogger(this.getClass)
val conf = new SparkConf()
.setAppName(appName)
.setJars(SparkContext.jarOfClass(this.getClass).toSeq)
val ssc = new StreamingContext(conf, Seconds(1))
LOG.info("Started at %d".format(ssc.sparkContext.startTime))
val comments = ssc.fileStream[LongWritable, Text, TextInputFormat](inputPath, (f: Path) => true, newFilesOnly = false).map(pair => pair._2.toString)
val recCount = comments.count()
val recCountValue = comments.countByValue()
val totalWords = comments.map(rec => ((parse(rec) \ "body").values.toString))
.flatMap(body => body.split(" "))
.map(word => 1)
.reduce(_ + _)
ssc.start()
ssc.awaitTermination()
}
}
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:52,代码来源:L3-DStreamAggregation.scala
示例16: StreamingTranslateApp
//设置package包名称以及导入依赖的类
package org.apress.prospark
import scala.io.Source
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.Text
object StreamingTranslateApp {
def main(args: Array[String]) {
if (args.length != 4) {
System.err.println(
"Usage: StreamingTranslateApp <appname> <book_path> <output_path> <language>")
System.exit(1)
}
val Seq(appName, bookPath, outputPath, lang) = args.toSeq
val dict = getDictionary(lang)
val conf = new SparkConf()
.setAppName(appName)
.setJars(SparkContext.jarOfClass(this.getClass).toSeq)
val ssc = new StreamingContext(conf, Seconds(1))
val book = ssc.textFileStream(bookPath)
val translated = book.map(line => line.split("\\s+").map(word => dict.getOrElse(word, word)).mkString(" "))
translated.saveAsTextFiles(outputPath)
ssc.start()
ssc.awaitTermination()
}
def getDictionary(lang: String): Map[String, String] = {
if (!Set("German", "French", "Italian", "Spanish").contains(lang)) {
System.err.println(
"Unsupported language: %s".format(lang))
System.exit(1)
}
val url = "http://www.june29.com/IDP/files/%s.txt".format(lang)
println("Grabbing dictionary from: %s".format(url))
Source.fromURL(url, "ISO-8859-1").mkString
.split("\\r?\\n")
.filter(line => !line.startsWith("#"))
.map(line => line.split("\\t"))
.map(tkns => (tkns(0).trim, tkns(1).trim)).toMap
}
}
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:53,代码来源:L3-1DStreams.scala
示例17: FrequencyMapReducer
//设置package包名称以及导入依赖的类
package com.argcv.iphigenia.example.hdfs.mr
import com.argcv.valhalla.console.ColorForConsole._
import com.argcv.valhalla.utils.Awakable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{ IntWritable, Text }
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
object FrequencyMapReducer extends Awakable {
def main(args: Array[String]): Unit = {
// create a hadoop job and set main class
val job = Job.getInstance()
job.setJarByClass(FrequencyMapReducer.getClass)
job.setJobName("Frequency")
// set the input & output path
FileInputFormat.addInputPath(job, new Path(args.head))
FileOutputFormat.setOutputPath(job, new Path(s"${args(1)}-${System.currentTimeMillis()}"))
// set mapper & reducer
job.setMapperClass(FrequencyMapper.instance)
job.setReducerClass(FrequencyReducer.instance)
// specify the type of the output
job.setOutputKeyClass(new Text().getClass)
job.setOutputValueClass(new IntWritable().getClass)
// run
logger.info(s"job finished, status [${if (job.waitForCompletion(true)) "OK".withColor(GREEN) else "FAILED".withColor(RED)}]")
}
}
开发者ID:yuikns,项目名称:iphigenia,代码行数:37,代码来源:FrequencyMapReducer.scala
示例18: FrequencyMapper
//设置package包名称以及导入依赖的类
package com.argcv.iphigenia.example.hdfs.mr
import org.apache.hadoop.io.{ IntWritable, LongWritable, Text }
import org.apache.hadoop.mapreduce.Mapper
class FrequencyMapper extends Mapper[LongWritable, Text, Text, IntWritable] {
type Context = Mapper[LongWritable, Text, Text, IntWritable]#Context
override def map(offset: LongWritable, lineText: Text, context: Context): Unit = {
val line = lineText.toString
val eventID: String = line.split(",")(1)
context.write(new Text(eventID), FrequencyMapper.ONE)
}
}
object FrequencyMapper {
def instance = new FrequencyMapper().getClass
lazy val ONE = new IntWritable(1)
}
开发者ID:yuikns,项目名称:iphigenia,代码行数:22,代码来源:FrequencyMapper.scala
示例19:
//设置package包名称以及导入依赖的类
package it.agilelab.bigdata.spark.search.examples
import scala.io.BufferedSource
import scala.xml.pull._
import java.io.ByteArrayInputStream
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.io.{LongWritable, Text}
import com.databricks.spark.xml.XmlInputFormat
import it.agilelab.bigdata.spark.search.evaluation.utils.wikipage
event match {
case event: EvElemStart => {
val castedEvent = event.asInstanceOf[EvElemStart]
if (castedEvent.label.equalsIgnoreCase("page")) {
inPage = true
} else if (inPage && castedEvent.label.equalsIgnoreCase("title")) {
inTitle = true
} else if (inPage && castedEvent.label.equalsIgnoreCase("text")) {
inText = true
}
}
case event: EvText => {
val castedEvent = event.asInstanceOf[EvText]
if (inTitle) {
title = castedEvent.text
} else if (inText) {
text = if (text == "") castedEvent.text else text + "\"" + castedEvent.text
}
}
case event: EvElemEnd => {
val castedEvent = event.asInstanceOf[EvElemEnd]
if (castedEvent.label.equalsIgnoreCase("page")) {
inPage = false
pageReady = true
} else if (inPage && castedEvent.label.equalsIgnoreCase("title")) {
inTitle = false
} else if (inPage && castedEvent.label.equalsIgnoreCase("text")) {
inText = false
}
}
case _ => ;
}
}
(title, text)
}
// parse records into tuples
val wikipedia = records map { parsePage(_) }
// map tuple into wikipage
val wikipages = wikipedia map { case (title, text) => wikipage(title, text) }
// save as object file of wikipage
wikipages.coalesce(50).saveAsObjectFile(outputPath)
}
}
开发者ID:agile-lab-dev,项目名称:sparksearchengine,代码行数:60,代码来源:ParseWikipediaDump.scala
示例20: AutoTainterTest
//设置package包名称以及导入依赖的类
package edu.anonymous.cs.dft
import java.io.File
import edu.columbia.cs.psl.phosphor.runtime.Tainter
import edu.anonymous.cs.dft.tracker.{FullAutoTainter, TextAutoTainter}
import org.apache.hadoop.io.Text
object AutoTainterTest {
def main(args: Array[String]): Unit = {
val file = "/tmp/autoTainter.conf"
val testString = "max 23 male"
val autoTainter = if (new File(file).exists) {
new TextAutoTainter(file)
} else {
new FullAutoTainter
}
autoTainter.initEngine()
val taintedString = autoTainter.setTaint(testString)
val name = taintedString.substring(0, 3)
val age = taintedString.substring(4, 6)
val gender = taintedString.substring(7, 9)
val a = new Text
a.set(age)
val b = a.toString.toInt
assert(Tainter.getTaint(age) == 2)
assert(Tainter.getTaint(name) == 1)
assert(Tainter.getTaint(gender) == 3)
println(taintedString)
}
}
开发者ID:acsac17-p78,项目名称:kakute,代码行数:36,代码来源:AutoTainterTest.scala
注:本文中的org.apache.hadoop.io.Text类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论