• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Text类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.hadoop.io.Text的典型用法代码示例。如果您正苦于以下问题:Scala Text类的具体用法?Scala Text怎么用?Scala Text使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Text类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: FPMiningPreprocessingApp

//设置package包名称以及导入依赖的类
package org.apress.prospark

import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.FileSplit
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.HadoopRDD
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

import com.google.common.io.Files

object FPMiningPreprocessingApp {

  def main(args: Array[String]) {
    if (args.length != 3) {
      System.err.println(
        "Usage: FPMiningPreprocessingApp <appname> <inputpath> <outputpath>")
      System.exit(1)
    }
    val Seq(appName, iPath, oPath) = args.toSeq

    val conf = new SparkConf()
      .setAppName(appName)
      .setJars(SparkContext.jarOfClass(this.getClass).toSeq)

    val delim = " "

    val sc = new SparkContext(conf)
    sc.hadoopFile(iPath, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], sc.defaultMinPartitions)
      .asInstanceOf[HadoopRDD[LongWritable, Text]]
      .mapPartitionsWithInputSplit((iSplit, iter) =>
        iter.map(splitAndLine => (Files.getNameWithoutExtension(iSplit.asInstanceOf[FileSplit].getPath.toString), splitAndLine._2.toString.split(" ")(1))))
      .filter(r => r._2 != "0")
      .map(r => (r._1, r._2))
      .distinct()
      .groupByKey()
      .map(r => r._2.mkString(" "))
      .sample(false, 0.7)
      .coalesce(1)
      .saveAsTextFile(oPath)
  }
} 
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:45,代码来源:L9-13FPMiningPreprocessing.scala


示例2: RedditVariationApp

//设置package包名称以及导入依赖的类
package org.apress.prospark

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{ Milliseconds, Seconds, StreamingContext }
import org.apache.hadoop.io.{ Text, LongWritable, IntWritable }
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.streaming.dstream.DStream
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{ TextOutputFormat => NewTextOutputFormat }
import org.apache.spark.streaming.dstream.PairDStreamFunctions
import org.apache.log4j.LogManager
import org.json4s._
import org.json4s.native.JsonMethods._
import java.text.SimpleDateFormat
import java.util.Date

object RedditVariationApp {
  def main(args: Array[String]) {
    if (args.length != 2) {
      System.err.println(
        "Usage: RedditVariationApp <appname> <input_path>")
      System.exit(1)
    }
    val Seq(appName, inputPath) = args.toSeq
    val LOG = LogManager.getLogger(this.getClass)

    val conf = new SparkConf()
      .setAppName(appName)
      .setJars(SparkContext.jarOfClass(this.getClass).toSeq)

    val ssc = new StreamingContext(conf, Seconds(1))
    LOG.info("Started at %d".format(ssc.sparkContext.startTime))

    val comments = ssc.fileStream[LongWritable, Text, TextInputFormat](inputPath, (f: Path) => true, newFilesOnly = false).map(pair => pair._2.toString)

    val merged = comments.union(comments)

    val repartitionedComments = comments.repartition(4)

    val rddMin = comments.glom().map(arr =>
      arr.minBy(rec => ((parse(rec) \ "created_utc").values.toString.toInt)))

    ssc.start()
    ssc.awaitTermination()

  }
} 
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:50,代码来源:L3-DStreamVariation.scala


示例3: FrequencyMapReducer

//设置package包名称以及导入依赖的类
package com.argcv.cse8803.mapreducebasic

import com.argcv.valhalla.console.ColorForConsole._
import com.argcv.valhalla.utils.Awakable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat


object FrequencyMapReducer extends Awakable {

  def main(args: Array[String]): Unit = {
    // create a hadoop job and set main class
    val job = Job.getInstance()
    job.setJarByClass(FrequencyMapReducer.getClass)
    job.setJobName("Frequency")

    // set the input & output path
    FileInputFormat.addInputPath(job, new Path(args.head))
    FileOutputFormat.setOutputPath(job, new Path(s"${args(1)}-${System.currentTimeMillis()}"))

    // set mapper & reducer
    job.setMapperClass(FrequencyMapper.instance)
    job.setReducerClass(FrequencyReducer.instance)

    // specify the type of the output
    job.setOutputKeyClass(new Text().getClass)
    job.setOutputValueClass(new IntWritable().getClass)

    // run
    logger.info(s"job finished, status [${if (job.waitForCompletion(true)) "OK".withColor(GREEN) else "FAILED".withColor(RED)}]")
  }

} 
开发者ID:yuikns,项目名称:cse8803,代码行数:37,代码来源:FrequencyMapReducer.scala


示例4: FunctionalSyntaxOWLExpressionsDataSetBuilder

//设置package包名称以及导入依赖的类
package net.sansa_stack.owl.flink.dataset

import net.sansa_stack.owl.common.parsing.{FunctionalSyntaxExpressionBuilder, FunctionalSyntaxPrefixParsing}
import net.sansa_stack.owl.flink.hadoop.FunctionalSyntaxInputFormat
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.hadoop.io.{LongWritable, Text}


object FunctionalSyntaxOWLExpressionsDataSetBuilder extends FunctionalSyntaxPrefixParsing {
  def build(env: ExecutionEnvironment, filePath: String): OWLExpressionsDataSet = {
    import org.apache.flink.api.scala._
    val hadoopDataSet: DataSet[(LongWritable, Text)] =
      env.readHadoopFile[LongWritable, Text](
        new FunctionalSyntaxInputFormat,
        classOf[LongWritable],
        classOf[Text],
        filePath
      )
    val rawDataSet = hadoopDataSet.map(_._2.toString)

    val tmp: Seq[(String, String)] = rawDataSet.filter(isPrefixDeclaration(_)).map(parsePrefix(_)).collect()
    val prefixes: Map[String, String] = tmp.toMap

    val builder = new FunctionalSyntaxExpressionBuilder(prefixes)

    rawDataSet.map(builder.clean(_)).filter(_ != null)
  }

} 
开发者ID:SANSA-Stack,项目名称:SANSA-OWL,代码行数:30,代码来源:FunctionalSyntaxOWLExpressionsDataSetBuilder.scala


示例5: ManchesterSyntaxOWLExpressionsDataSetBuilder

//设置package包名称以及导入依赖的类
package net.sansa_stack.owl.flink.dataset

import net.sansa_stack.owl.common.parsing.{ManchesterSyntaxExpressionBuilder, ManchesterSyntaxPrefixParsing}
import net.sansa_stack.owl.flink.hadoop.ManchesterSyntaxInputFormat
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.hadoop.io.{LongWritable, Text}


object ManchesterSyntaxOWLExpressionsDataSetBuilder extends  ManchesterSyntaxPrefixParsing {
  def build(env: ExecutionEnvironment, filePath: String): OWLExpressionsDataSet = {
    buildAndGetPrefixes(env, filePath)._1
  }

  private[dataset] def buildAndGetPrefixes(env: ExecutionEnvironment,
      filePath: String): (OWLExpressionsDataSet, Map[String, String]) = {

    import org.apache.flink.api.scala._
    val hadoopDataSet: DataSet[(LongWritable, Text)] =
      env.readHadoopFile[LongWritable, Text](
        new ManchesterSyntaxInputFormat,
        classOf[LongWritable],
        classOf[Text],
        filePath
      )
    val rawDataSet = hadoopDataSet.map(_._2.toString)

    val tmp: Seq[(String, String)] = rawDataSet.filter(isPrefixDeclaration(_)).map(parsePrefix(_)).collect()
    val prefixes: Map[String, String] = tmp.toMap

    val builder = new ManchesterSyntaxExpressionBuilder(prefixes)

    (rawDataSet.map(builder.clean(_)).filter(_ != null), prefixes)
  }
} 
开发者ID:SANSA-Stack,项目名称:SANSA-OWL,代码行数:35,代码来源:ManchesterSyntaxOWLExpressionsDataSetBuilder.scala


示例6: ManchesterSyntaxOWLExpressionsRDDBuilder

//设置package包名称以及导入依赖的类
package net.sansa_stack.owl.spark.rdd

import net.sansa_stack.owl.common.parsing.{ManchesterSyntaxExpressionBuilder, ManchesterSyntaxPrefixParsing}
import net.sansa_stack.owl.spark.hadoop.ManchesterSyntaxInputFormat
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.spark.SparkContext


object ManchesterSyntaxOWLExpressionsRDDBuilder extends ManchesterSyntaxPrefixParsing {
  def build(sc: SparkContext, filePath: String): OWLExpressionsRDD = {
    buildAndGetPrefixes(sc, filePath)._1
  }

  private[spark] def buildAndGetPrefixes(sc: SparkContext, filePath: String): (OWLExpressionsRDD, Map[String, String]) = {
    val rawRDD = sc.hadoopFile(
      filePath,
      classOf[ManchesterSyntaxInputFormat],
      classOf[LongWritable],
      classOf[Text],
      sc.defaultMinPartitions).map(_._2.toString)

    val tmp: Array[(String, String)] =
      rawRDD.filter(isPrefixDeclaration(_)).map(parsePrefix).collect()
    val prefixes: Map[String, String] = tmp.toMap

    val builder = new ManchesterSyntaxExpressionBuilder(prefixes)
    (rawRDD.map(builder.clean(_)).filter(_ != null), prefixes)
  }
} 
开发者ID:SANSA-Stack,项目名称:SANSA-OWL,代码行数:30,代码来源:ManchesterSyntaxOWLExpressionsRDDBuilder.scala


示例7: FunctionalSyntaxOWLExpressionsRDDBuilder

//设置package包名称以及导入依赖的类
package net.sansa_stack.owl.spark.rdd

import net.sansa_stack.owl.common.parsing.{FunctionalSyntaxExpressionBuilder, FunctionalSyntaxPrefixParsing}
import net.sansa_stack.owl.spark.hadoop.FunctionalSyntaxInputFormat
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.spark.SparkContext


object FunctionalSyntaxOWLExpressionsRDDBuilder extends Serializable with FunctionalSyntaxPrefixParsing {
  def build(sc: SparkContext, filePath: String): OWLExpressionsRDD = {
    val hadoopRDD = sc.hadoopFile(
      filePath, classOf[FunctionalSyntaxInputFormat], classOf[LongWritable],
      classOf[Text], sc.defaultMinPartitions)

    val rawRDD = hadoopRDD.map(entry => entry._2.toString)

    val tmp: Array[(String, String)] =
          rawRDD.filter(isPrefixDeclaration(_)).map(parsePrefix).collect()
    val prefixes: Map[String, String] = tmp.toMap

    val builder = new FunctionalSyntaxExpressionBuilder(prefixes)

    rawRDD.map(builder.clean(_)).filter(_ != null)
  }
} 
开发者ID:SANSA-Stack,项目名称:SANSA-OWL,代码行数:26,代码来源:FunctionalSyntaxOWLExpressionsRDDBuilder.scala


示例8: Converter

//设置package包名称以及导入依赖的类
package com.dataoptimo.imgprocessing.convert
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.io.IOUtils
import org.apache.hadoop.io.SequenceFile
import java.io.IOException
import java.lang.IllegalArgumentException

class Converter(conf: Configuration) {
  
  def imageToSequence(srcPath: String, dstPath: String){
    try {
    val fs = FileSystem.get(conf);
    val inPath = new Path(srcPath);
    val outPath = new Path(dstPath);
    val key = new Text();
    val value = new BytesWritable();
    val in = fs.open(inPath);
    val buffer = new Array[Byte](in.available())
    in.read(buffer);
    var writer = SequenceFile.createWriter(fs, conf, outPath, key.getClass(),value.getClass());
    writer.append(new Text(inPath.getName()), new BytesWritable(buffer));
    IOUtils.closeStream(writer);
    }
    catch {
      case io: IOException => println(io.getMessage)
      case illegalArgument: IllegalArgumentException => println(illegalArgument.getMessage)
    }
    
    
    
  }
} 
开发者ID:mfawadalam,项目名称:imgprocessing,代码行数:36,代码来源:Converters.scala


示例9: ADAMContextExtensions

//设置package包名称以及导入依赖的类
package org.bdgenomics.adam.rdd

import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkContext
import org.apache.spark.api.java.StorageLevels
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.converters.FastaConverter
import org.bdgenomics.adam.rdd.contig.NucleotideContigFragmentRDD
import org.bdgenomics.utils.instrumentation.Metrics
import org.apache.spark.rdd.MetricsContext._
import org.bdgenomics.adam.rdd.feature.FeatureRDD
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.formats.avro.Feature


object ADAMContextExtensions {

  implicit class spExt(val sparkContext: SparkContext) extends HDFSFilesExtensions{

    def loadFastaPersistent(
                   filePath: String,
                   fragmentLength: Long = 10000L): NucleotideContigFragmentRDD = {
      val fastaData: RDD[(LongWritable, Text)] = sparkContext.newAPIHadoopFile(
        filePath,
        classOf[TextInputFormat],
        classOf[LongWritable],
        classOf[Text]
      )
      if (Metrics.isRecording) fastaData.instrument() else fastaData

      val remapData = fastaData.map(kv => (kv._1.get, kv._2.toString))

      // convert rdd and cache
      val fragmentRdd = FastaConverter(remapData, fragmentLength)
        .persist(StorageLevels.MEMORY_AND_DISK)

      NucleotideContigFragmentRDD(fragmentRdd)
    }

    def mergeFeatures(features: List[FeatureRDD]): Option[FeatureRDD] = features match {
      case Nil => None
      case head :: Nil => Some(head)
      case head :: tail =>
        val merged = tail.foldLeft(head){
          case (acc, feature) =>
            val joined = acc.broadcastRegionJoin(feature)
            acc.transform(_ => joined.rdd.map{
              case (one, two) =>
                one.setStart(Math.min(one.getStart, two.getStart))
                one.setEnd(Math.max(one.getEnd, two.getEnd))
                one
            })
        }
        Some(merged)
    }

  }


} 
开发者ID:antonkulaga,项目名称:adam-playground,代码行数:62,代码来源:ADAMContextExtensions.scala


示例10:

//设置package包名称以及导入依赖的类
package epam.idobrovolskiy.wikipedia

import epam.idobrovolskiy.wikipedia.trending.time.PlainDatesExtractor
import epam.idobrovolskiy.wikipedia.trending.tokenizer.StopWordsTokenizer
import org.apache.hadoop.io.{IntWritable, Text}

package object trending extends scala.AnyRef {
  val AppName = "wikipedia-trending"

  val DefaultTokenizer = new StopWordsTokenizer
  val TopTokenCount = 10

  val DefaultInputWikiDumpFilename = "wiki_small"
  val DefaultPrepHeaderFilename = "wiki_prep_headers"
  val DefaultPrepFullFilename = "wiki_prep_full"
  val DefaultDateCitationsFileName = "wiki_date_citations"
  val DefaultDateIndexFileName = "wiki_index_dates"
  val DefaultDocIndexFileName = "wiki_index_docs"

  val DefaultTarget = preprocessing.PreprocessingTarget.Stdout
  val DefaultPathForPlainTextExtraction = "./data/out"
  val DefaultWikipediaDumpFilesPath = "./data/in"
  val DefaultPlainTextExtractor = preprocessing.attardi.AttardiPlainTextExtractor

  val HdfsNameNodeHost = "hdfs://sandbox.hortonworks.com:8020"
  val HdfsRootPath = "/user/idobrovolskiy/wikipedia-trending/"

  val PreprocessedFileHeaderBodyDelimiter = "\n\n"
  type PreprocessedSequenceFileKeyType = IntWritable
  type PreprocessedSequenceFileValueType = Text

  val DefaultDatesExtractor = new PlainDatesExtractor

  lazy val spark = common.SparkUtils.sparkSession
} 
开发者ID:igor-dobrovolskiy-epam,项目名称:wikipedia-analysis-scala-core,代码行数:36,代码来源:package.scala


示例11: WikiCounter

//设置package包名称以及导入依赖的类
package main

import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.hadoop.streaming.StreamXmlRecordReader
import org.apache.hadoop.util.StringUtils
import org.apache.spark.{SparkContext, SparkConf}

import scala.xml.XML

object WikiCounter {
  def main(args: Array[String]) {
    val conf    = new SparkConf().setAppName("Word Counter")
    val sc      = new SparkContext(conf)
    val jobConf = new JobConf()

    jobConf.set("stream.recordreader.class",
      "org.apache.hadoop.streaming.StreamXmlRecordReader")
    jobConf.set("stream.recordreader.begin", "<page")
    jobConf.set("stream.recordreader.end", "</page>")
    FileInputFormat.addInputPaths(jobConf,
      "file:///Users/di/Books/enwiki-20160204-pages-articles1.xml-p000000010p000030302")

    val wikiDocuments = sc.hadoopRDD(jobConf,
      classOf[org.apache.hadoop.streaming.StreamInputFormat],
      classOf[Text], classOf[Text])
        .map(_._1.toString)

    val rawWikiPages = wikiDocuments.map(wikiString => {
      val wikiXml = XML.loadString(wikiString)
      (wikiXml \ "revision" \ "text").text
    })

    val tokenizedWikiData = rawWikiPages
      .map(_.replaceAll("[.|,|'|\"|?|)|(|_|0-9]", " ").trim)
      .flatMap(_.split("\\W+"))
      .filter(_.length > 2)

    val sortedByLength = tokenizedWikiData.distinct
      .sortBy(_.length, ascending = false)
      .sample(withReplacement = false, fraction = 0.1)

    sortedByLength.saveAsTextFile("/tmp/wiki_pages")
  }
} 
开发者ID:denyago,项目名称:spark-wordcount,代码行数:46,代码来源:WikiCounter.scala


示例12: VoyagerApp

//设置package包名称以及导入依赖的类
package org.apress.prospark

import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions

object VoyagerApp {
  def main(args: Array[String]) {
    if (args.length != 3) {
      System.err.println(
        "Usage: VoyagerApp <appname> <inputPath> <outputPath>")
      System.exit(1)
    }
    val Seq(appName, inputPath, outputPath) = args.toSeq

    val conf = new SparkConf()
      .setAppName(appName)
      .setJars(SparkContext.jarOfClass(this.getClass).toSeq)
      .set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC")

    val ssc = new StreamingContext(conf, Seconds(10))

    val voyager1 = ssc.fileStream[LongWritable, Text, TextInputFormat](inputPath, (f: Path) => true, newFilesOnly = false).map(pair => pair._2.toString)
    voyager1.map(rec => {
      val attrs = rec.split("\\s+")
      ((attrs(0).toInt), attrs.slice(18, 28).map(_.toDouble))
    }).filter(pflux => pflux._2.exists(_ > 1.0)).map(rec => (rec._1, 1))
      .reduceByKey(_ + _)
      .transform(rec => rec.sortByKey(ascending = false, numPartitions = 1)).saveAsTextFiles(outputPath)

    ssc.start()
    ssc.awaitTermination()
  }
} 
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:42,代码来源:L4-1Voyager.scala


示例13: VoyagerAppKryo

//设置package包名称以及导入依赖的类
package org.apress.prospark

import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions

object VoyagerAppKryo {
  def main(args: Array[String]) {
    if (args.length != 3) {
      System.err.println(
        "Usage: VoyagerAppKryo <appname> <inputPath> <outputPath>")
      System.exit(1)
    }
    val Seq(appName, inputPath, outputPath) = args.toSeq

    val conf = new SparkConf()
      .setAppName(appName)
      .setJars(SparkContext.jarOfClass(this.getClass).toSeq)
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array(classOf[ProtonFlux]))

    val ssc = new StreamingContext(conf, Seconds(10))

    val voyager1 = ssc.fileStream[LongWritable, Text, TextInputFormat](inputPath, (f: Path) => true, newFilesOnly = false).map(pair => pair._2.toString)
    val projected = voyager1.map(rec => {
      val attrs = rec.split("\\s+")
      new ProtonFlux(attrs(0), attrs(18), attrs(19), attrs(20), attrs(21),
        attrs(22), attrs(23), attrs(24), attrs(25), attrs(26), attrs(27),
        attrs(28))
    })
    val filtered = projected.filter(pflux => pflux.isSolarStorm)
    val yearlyBreakdown = filtered.map(rec => (rec.year, 1))
      .reduceByKey(_ + _)
      .transform(rec => rec.sortByKey(ascending = false))
    yearlyBreakdown.saveAsTextFiles(outputPath)

    ssc.start()
    ssc.awaitTermination()
  }
} 
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:48,代码来源:L4-4Kryo.scala


示例14: CollabFilteringPreprocessingApp

//设置package包名称以及导入依赖的类
package org.apress.prospark

import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.FileSplit
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.HadoopRDD
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

import com.google.common.io.Files

object CollabFilteringPreprocessingApp {

  def main(args: Array[String]) {
    if (args.length != 3) {
      System.err.println(
        "Usage: CollabFilteringPreprocessingApp <appname> <inputpath> <outputpath>")
      System.exit(1)
    }
    val Seq(appName, iPath, oPath) = args.toSeq

    val conf = new SparkConf()
      .setAppName(appName)
      .setJars(SparkContext.jarOfClass(this.getClass).toSeq)

    val delim = " "

    val sc = new SparkContext(conf)
    sc.hadoopFile(iPath, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], sc.defaultMinPartitions)
      .asInstanceOf[HadoopRDD[LongWritable, Text]]
      .mapPartitionsWithInputSplit((iSplit, iter) =>
        iter.map(splitAndLine => (Files.getNameWithoutExtension(iSplit.asInstanceOf[FileSplit].getPath.toString), splitAndLine._2.toString.split(" ")(1))))
      .filter(r => r._2 != "0")
      .map(r => ((r._1, r._2), 1))
      .reduceByKey(_ + _)
      .map(r => r._1._1.replace("subject", "") + delim + r._1._2 + delim + r._2)
      .sample(false, 0.7)
      .coalesce(1)
      .saveAsTextFile(oPath)
  }
} 
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:44,代码来源:L9-11CollabFilteringPreprocessing.scala


示例15: RedditAggregationApp

//设置package包名称以及导入依赖的类
package org.apress.prospark

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{ Milliseconds, Seconds, StreamingContext }
import org.apache.hadoop.io.{ Text, LongWritable, IntWritable }
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.streaming.dstream.DStream
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{ TextOutputFormat => NewTextOutputFormat }
import org.apache.spark.streaming.dstream.PairDStreamFunctions
import org.apache.log4j.LogManager
import org.json4s._
import org.json4s.native.JsonMethods._
import java.text.SimpleDateFormat
import java.util.Date

object RedditAggregationApp {
  def main(args: Array[String]) {
    if (args.length != 2) {
      System.err.println(
        "Usage: RedditAggregationApp <appname> <input_path>")
      System.exit(1)
    }
    val Seq(appName, inputPath) = args.toSeq
    val LOG = LogManager.getLogger(this.getClass)

    val conf = new SparkConf()
      .setAppName(appName)
      .setJars(SparkContext.jarOfClass(this.getClass).toSeq)

    val ssc = new StreamingContext(conf, Seconds(1))
    LOG.info("Started at %d".format(ssc.sparkContext.startTime))

    val comments = ssc.fileStream[LongWritable, Text, TextInputFormat](inputPath, (f: Path) => true, newFilesOnly = false).map(pair => pair._2.toString)

    val recCount = comments.count()

    val recCountValue = comments.countByValue()

    val totalWords = comments.map(rec => ((parse(rec) \ "body").values.toString))
      .flatMap(body => body.split(" "))
      .map(word => 1)
      .reduce(_ + _)

    ssc.start()
    ssc.awaitTermination()

  }
} 
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:52,代码来源:L3-DStreamAggregation.scala


示例16: StreamingTranslateApp

//设置package包名称以及导入依赖的类
package org.apress.prospark

import scala.io.Source
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.Text

object StreamingTranslateApp {
  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println(
        "Usage: StreamingTranslateApp <appname> <book_path> <output_path> <language>")
      System.exit(1)
    }
    val Seq(appName, bookPath, outputPath, lang) = args.toSeq

    val dict = getDictionary(lang)

    val conf = new SparkConf()
      .setAppName(appName)
      .setJars(SparkContext.jarOfClass(this.getClass).toSeq)
    val ssc = new StreamingContext(conf, Seconds(1))

    val book = ssc.textFileStream(bookPath)
    val translated = book.map(line => line.split("\\s+").map(word => dict.getOrElse(word, word)).mkString(" "))
    translated.saveAsTextFiles(outputPath)

    ssc.start()
    ssc.awaitTermination()
  }

  def getDictionary(lang: String): Map[String, String] = {
    if (!Set("German", "French", "Italian", "Spanish").contains(lang)) {
      System.err.println(
        "Unsupported language: %s".format(lang))
      System.exit(1)
    }
    val url = "http://www.june29.com/IDP/files/%s.txt".format(lang)
    println("Grabbing dictionary from: %s".format(url))
    Source.fromURL(url, "ISO-8859-1").mkString
      .split("\\r?\\n")
      .filter(line => !line.startsWith("#"))
      .map(line => line.split("\\t"))
      .map(tkns => (tkns(0).trim, tkns(1).trim)).toMap
  }

} 
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:53,代码来源:L3-1DStreams.scala


示例17: FrequencyMapReducer

//设置package包名称以及导入依赖的类
package com.argcv.iphigenia.example.hdfs.mr

import com.argcv.valhalla.console.ColorForConsole._
import com.argcv.valhalla.utils.Awakable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{ IntWritable, Text }
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat


object FrequencyMapReducer extends Awakable {

  def main(args: Array[String]): Unit = {
    // create a hadoop job and set main class
    val job = Job.getInstance()
    job.setJarByClass(FrequencyMapReducer.getClass)
    job.setJobName("Frequency")

    // set the input & output path
    FileInputFormat.addInputPath(job, new Path(args.head))
    FileOutputFormat.setOutputPath(job, new Path(s"${args(1)}-${System.currentTimeMillis()}"))

    // set mapper & reducer
    job.setMapperClass(FrequencyMapper.instance)
    job.setReducerClass(FrequencyReducer.instance)

    // specify the type of the output
    job.setOutputKeyClass(new Text().getClass)
    job.setOutputValueClass(new IntWritable().getClass)

    // run
    logger.info(s"job finished, status [${if (job.waitForCompletion(true)) "OK".withColor(GREEN) else "FAILED".withColor(RED)}]")
  }

} 
开发者ID:yuikns,项目名称:iphigenia,代码行数:37,代码来源:FrequencyMapReducer.scala


示例18: FrequencyMapper

//设置package包名称以及导入依赖的类
package com.argcv.iphigenia.example.hdfs.mr

import org.apache.hadoop.io.{ IntWritable, LongWritable, Text }
import org.apache.hadoop.mapreduce.Mapper


class FrequencyMapper extends Mapper[LongWritable, Text, Text, IntWritable] {
  type Context = Mapper[LongWritable, Text, Text, IntWritable]#Context

  override def map(offset: LongWritable, lineText: Text, context: Context): Unit = {
    val line = lineText.toString
    val eventID: String = line.split(",")(1)
    context.write(new Text(eventID), FrequencyMapper.ONE)
  }
}

object FrequencyMapper {
  def instance = new FrequencyMapper().getClass

  lazy val ONE = new IntWritable(1)
} 
开发者ID:yuikns,项目名称:iphigenia,代码行数:22,代码来源:FrequencyMapper.scala


示例19:

//设置package包名称以及导入依赖的类
package it.agilelab.bigdata.spark.search.examples

import scala.io.BufferedSource
import scala.xml.pull._
import java.io.ByteArrayInputStream

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.io.{LongWritable, Text}
import com.databricks.spark.xml.XmlInputFormat
import it.agilelab.bigdata.spark.search.evaluation.utils.wikipage



				event match {
					case event: EvElemStart => {
						val castedEvent = event.asInstanceOf[EvElemStart]
						if (castedEvent.label.equalsIgnoreCase("page")) {
							inPage = true
						} else if (inPage && castedEvent.label.equalsIgnoreCase("title")) {
							inTitle = true
						} else if (inPage && castedEvent.label.equalsIgnoreCase("text")) {
							inText = true
						}
					}
					case event: EvText => {
						val castedEvent = event.asInstanceOf[EvText]
						if (inTitle) {
							title = castedEvent.text
						} else if (inText) {
							text = if (text == "") castedEvent.text else text + "\"" + castedEvent.text
						}
					}
					case event: EvElemEnd => {
						val castedEvent = event.asInstanceOf[EvElemEnd]
						if (castedEvent.label.equalsIgnoreCase("page")) {
							inPage = false
							pageReady = true
						} else if (inPage && castedEvent.label.equalsIgnoreCase("title")) {
							inTitle = false
						} else if (inPage && castedEvent.label.equalsIgnoreCase("text")) {
							inText = false
						}
					}
					case _ => ;
				}
			}
			(title, text)
		}
		
		// parse records into tuples
		val wikipedia = records map { parsePage(_) }
		
		// map tuple into wikipage
		val wikipages = wikipedia map { case (title, text) => wikipage(title, text) }
		
		// save as object file of wikipage
		wikipages.coalesce(50).saveAsObjectFile(outputPath)
	}
} 
开发者ID:agile-lab-dev,项目名称:sparksearchengine,代码行数:60,代码来源:ParseWikipediaDump.scala


示例20: AutoTainterTest

//设置package包名称以及导入依赖的类
package edu.anonymous.cs.dft

import java.io.File

import edu.columbia.cs.psl.phosphor.runtime.Tainter
import edu.anonymous.cs.dft.tracker.{FullAutoTainter, TextAutoTainter}
import org.apache.hadoop.io.Text


object AutoTainterTest {

  def main(args: Array[String]): Unit = {
    val file = "/tmp/autoTainter.conf"

    val testString = "max 23 male"
    val autoTainter = if (new File(file).exists) {
      new TextAutoTainter(file)
    } else {
      new FullAutoTainter
    }
    autoTainter.initEngine()
    val taintedString = autoTainter.setTaint(testString)
    val name = taintedString.substring(0, 3)
    val age = taintedString.substring(4, 6)
    val gender = taintedString.substring(7, 9)
    val a = new Text
    a.set(age)
    val b = a.toString.toInt
    assert(Tainter.getTaint(age) == 2)
    assert(Tainter.getTaint(name) == 1)
    assert(Tainter.getTaint(gender) == 3)
    println(taintedString)
  }

} 
开发者ID:acsac17-p78,项目名称:kakute,代码行数:36,代码来源:AutoTainterTest.scala



注:本文中的org.apache.hadoop.io.Text类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala HashSet类代码示例发布时间:2022-05-23
下一篇:
Scala StringDecoder类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap