• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Path类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.hadoop.fs.Path的典型用法代码示例。如果您正苦于以下问题:Scala Path类的具体用法?Scala Path怎么用?Scala Path使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Path类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: HdfsUtils

//设置package包名称以及导入依赖的类
package com.appleeye.spark.utils

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkContext

object HdfsUtils {
  def pathExists(path: String, sc: SparkContext): Boolean = {
    val conf = sc.hadoopConfiguration
    val fs = FileSystem.get(conf)
    fs.exists(new Path(path))
  }

  def getFullPath(path:String, sc: SparkContext): String = {
    val conf = sc.hadoopConfiguration
    val fs = FileSystem.get(conf)
    fs.getFileStatus(new Path(path)).getPath().toString
  }

  def getAllFiles(path:String, sc: SparkContext): Seq[String] = {
    val conf = sc.hadoopConfiguration
    val fs = FileSystem.get(conf)
    val files = fs.listStatus(new Path(path))
    files.map(_.getPath().toString)
  }
} 
开发者ID:MiracleZhou,项目名称:SparkStreaming,代码行数:26,代码来源:HdfsUtils.scala


示例2: HdfsClientConf

//设置package包名称以及导入依赖的类
package pub.ayada.scala.hdfsutils

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem

class HdfsClientConf private (val coreStiteXMLPath: String, val hdfsStiteXMLPath: String) {
    private val conf = new Configuration();
    conf.addResource(new Path(coreStiteXMLPath));
    conf.addResource(new Path(hdfsStiteXMLPath));

    def getHdfsClientConf(): Configuration = conf

    def getHdpFileSystem(): FileSystem = FileSystem.get(conf);
}
object HdfsClientConf {

    private var instance: HdfsClientConf = null

    def getOneTimeInstance(coreStiteXMLPath: String, hdfsStiteXMLPath: String): Configuration = {
        new HdfsClientConf(coreStiteXMLPath, hdfsStiteXMLPath).getHdfsClientConf()
    }
    def setSingltonInstance(coreStiteXMLPath: String, hdfsStiteXMLPath: String): Configuration = {
        if (instance == null)
            instance = new HdfsClientConf(coreStiteXMLPath, hdfsStiteXMLPath)

        instance.getHdfsClientConf()
    }
    def getSingletonInstance(): HdfsClientConf = {
        if (instance == null)
            throw new NullPointerException("Instanciate HdfsClientConf before retriving")

        instance
    }
} 
开发者ID:k-ayada,项目名称:HdfsUtils,代码行数:36,代码来源:HdfsClientConf.scala


示例3: HDFS

//设置package包名称以及导入依赖的类
package org.mireynol.util

import java.io.{BufferedInputStream, OutputStreamWriter}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.slf4j.{Logger, LoggerFactory}

import scala.collection.mutable.ListBuffer
import scala.io.Source

object HDFS {

  def log : Logger = LoggerFactory.getLogger( HDFS.getClass )

  val hadoop : FileSystem = {
    val conf = new Configuration( )
    conf.set( "fs.defaultFS", "hdfs://localhost:9000" )
    FileSystem.get( conf )
  }

  def readAndMap( path : String, mapper : ( String ) => Unit ) = {
    if ( hadoop.exists( new Path( path ) ) ) {
      val is = new BufferedInputStream( hadoop.open( new Path( path ) ) )
      Source.fromInputStream( is ).getLines( ).foreach( mapper )
    }
    else {
      // TODO - error logic here
    }
  }

  def write( filename : String, content : Iterator[ String ] ) = {
    val path = new Path( filename )
    val out = new OutputStreamWriter( hadoop.create( path, false ) )
    content.foreach( str => out.write( str + "\n" ) )
    out.flush( )
    out.close( )
  }

  def ls( path : String ) : List[ String ] = {
    val files = hadoop.listFiles( new Path( path ), false )
    val filenames = ListBuffer[ String ]( )
    while ( files.hasNext ) filenames += files.next( ).getPath( ).toString( )
    filenames.toList
  }

  def rm( path : String, recursive : Boolean ) : Unit = {
    if ( hadoop.exists( new Path( path ) ) ) {
      println( "deleting file : " + path )
      hadoop.delete( new Path( path ), recursive )
    }
    else {
      println( "File/Directory" + path + " does not exist" )
      log.warn( "File/Directory" + path + " does not exist" )
    }
  }

  def cat( path : String ) = Source.fromInputStream( hadoop.open( new Path( path ) ) ).getLines( ).foreach( println )

} 
开发者ID:reynoldsm88,项目名称:spark-drools,代码行数:61,代码来源:HDFS.scala


示例4: RedditVariationApp

//设置package包名称以及导入依赖的类
package org.apress.prospark

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{ Milliseconds, Seconds, StreamingContext }
import org.apache.hadoop.io.{ Text, LongWritable, IntWritable }
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.streaming.dstream.DStream
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{ TextOutputFormat => NewTextOutputFormat }
import org.apache.spark.streaming.dstream.PairDStreamFunctions
import org.apache.log4j.LogManager
import org.json4s._
import org.json4s.native.JsonMethods._
import java.text.SimpleDateFormat
import java.util.Date

object RedditVariationApp {
  def main(args: Array[String]) {
    if (args.length != 2) {
      System.err.println(
        "Usage: RedditVariationApp <appname> <input_path>")
      System.exit(1)
    }
    val Seq(appName, inputPath) = args.toSeq
    val LOG = LogManager.getLogger(this.getClass)

    val conf = new SparkConf()
      .setAppName(appName)
      .setJars(SparkContext.jarOfClass(this.getClass).toSeq)

    val ssc = new StreamingContext(conf, Seconds(1))
    LOG.info("Started at %d".format(ssc.sparkContext.startTime))

    val comments = ssc.fileStream[LongWritable, Text, TextInputFormat](inputPath, (f: Path) => true, newFilesOnly = false).map(pair => pair._2.toString)

    val merged = comments.union(comments)

    val repartitionedComments = comments.repartition(4)

    val rddMin = comments.glom().map(arr =>
      arr.minBy(rec => ((parse(rec) \ "created_utc").values.toString.toInt)))

    ssc.start()
    ssc.awaitTermination()

  }
} 
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:50,代码来源:L3-DStreamVariation.scala


示例5: list

//设置package包名称以及导入依赖的类
package uk.co.odinconsultants.bitcoin.integration.hadoop

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hdfs.DistributedFileSystem
import uk.co.odinconsultants.bitcoin.core.Logging
import uk.co.odinconsultants.bitcoin.integration.hadoop.HadoopForTesting.hdfsCluster

import scala.collection.mutable.ArrayBuffer

trait MiniHadoopClusterRunning extends Logging {

  val distributedFS: DistributedFileSystem  = hdfsCluster.getFileSystem
  val conf: Configuration                   = HadoopForTesting.conf
  val dir                                   = s"/${this.getClass.getSimpleName}/"

  def list(path: String): List[Path] = {
    info(s"Looking in $path")

    val files = distributedFS.listFiles(new Path(path), true)

    val allPaths = ArrayBuffer[Path]()
    while (files.hasNext) {
      val file = files.next
      allPaths += file.getPath
    }

    allPaths.toList
  }

  def copyToHdfs(inputFile: Path): Path = {
    val fromFile  = inputFile.getName
    distributedFS.mkdirs(new Path(dir))
    val toFile    = new Path(dir + fromFile)
    info(s"Copying '$fromFile' to '$toFile' (${toFile.getName})")
    distributedFS.copyFromLocalFile(false, true, inputFile, toFile)
    toFile
  }

  def localFile(local: String): Path = {
    val classLoader = getClass.getClassLoader
    val localFQN    = classLoader.getResource(local).getFile
    new Path(localFQN)
  }
} 
开发者ID:PhillHenry,项目名称:Cryptorigin,代码行数:46,代码来源:MiniHadoopClusterRunning.scala


示例6: Sentiment140Downloader

//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts

import java.net.URL
import java.util.zip.ZipInputStream

import org.apache.hadoop.fs.Path
import org.apache.spark.Logging


object Sentiment140Downloader extends Script with Logging {

  val downloadUrl = "http://cs.stanford.edu/people/alecmgo/trainingandtestdata.zip"

  override def main(args: Array[String]) {
    super.main(args)

    logInfo(s"Downloading sentiment140 dataset from $downloadUrl")
    val zip = new ZipInputStream(new URL(downloadUrl).openStream())
    val buffer = new Array[Byte](4 * 1024)

    Stream.continually(zip.getNextEntry)
      .takeWhile(_ != null)
      .foreach { entry =>
        val fileName = entry.getName
        val out = hdfs.create(new Path(s"tw/sentiment/140/downloaded/$fileName"))
        logInfo(s"Downloading $fileName")

        Stream.continually(zip.read(buffer))
          .takeWhile(_ != -1)
          .foreach { count =>
            out.write(buffer, 0, count)
          }

        out.close()
      }

    zip.close()
    logInfo("Downloading finished")
    sc.stop()
  }

} 
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:43,代码来源:Sentiment140Downloader.scala


示例7: BenchmarkYarnPrepare

//设置package包名称以及导入依赖的类
package com.microsoft.spark.perf

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

object BenchmarkYarnPrepare {

  private def uploadFile(localPath: String, remoteWorkingDir: String): Unit = {
    new Path(remoteWorkingDir).getFileSystem(new Configuration()).
      copyFromLocalFile(new Path(localPath), new Path(remoteWorkingDir))
  }

  private def createRemoteWorkingDir(
      remoteWorkingDir: String,
      localJarPath: String,
      sparkSubmitParamsPath: String,
      benchmarkParamsPath: String): Unit = {
    uploadFile(localJarPath, remoteWorkingDir + "/spark-benchmark.jar")
    uploadFile(sparkSubmitParamsPath, remoteWorkingDir + "/spark.conf")
    uploadFile(benchmarkParamsPath, remoteWorkingDir + "/benchmark.conf")
  }

  def main(args: Array[String]): Unit = {
    val remoteWorkingDir = args(0)
    val localJarPath = args(1)
    val sparkSubmitParamsFilePath = args(2)
    val benchmarkParamsFilePath = args(3)

    createRemoteWorkingDir(remoteWorkingDir, localJarPath, sparkSubmitParamsFilePath,
      benchmarkParamsFilePath)
  }
} 
开发者ID:hdinsight,项目名称:SparkPerf,代码行数:33,代码来源:BenchmarkYarnPrepare.scala


示例8: AvroParquetWriterFn

//设置package包名称以及导入依赖的类
package io.eels.component.parquet.avro

import com.sksamuel.exts.Logging
import io.eels.component.parquet.ParquetWriterConfig
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.{ParquetFileWriter, ParquetWriter}


object AvroParquetWriterFn extends Logging {
  def apply(path: Path, avroSchema: Schema): ParquetWriter[GenericRecord] = {
    val config = ParquetWriterConfig()
    AvroParquetWriter.builder[GenericRecord](path)
      .withSchema(avroSchema)
      .withCompressionCodec(config.compressionCodec)
      .withPageSize(config.pageSize)
      .withRowGroupSize(config.blockSize)
      .withDictionaryEncoding(config.enableDictionary)
      .withWriteMode(ParquetFileWriter.Mode.CREATE)
      .withValidation(config.validating)
      .build()
  }
} 
开发者ID:51zero,项目名称:eel-sdk,代码行数:26,代码来源:AvroParquetWriterFn.scala


示例9: FileUtils

//设置package包名称以及导入依赖的类
package com.asto.dmp.xxx.util

import com.asto.dmp.xxx.base.Constants
import com.asto.dmp.ycd.base.Constants
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD


object FileUtils extends Logging {
  private val conf = new Configuration()
  conf.set("fs.defaultFS", Constants.Hadoop.DEFAULT_FS)
  conf.set("mapreduce.jobtracker.address", Constants.Hadoop.JOBTRACKER_ADDRESS)

  def deleteFilesInHDFS(paths: String*) = {
    paths.foreach { path =>
      val filePath = new Path(path)
      val HDFSFilesSystem = filePath.getFileSystem(new Configuration())
      if (HDFSFilesSystem.exists(filePath)) {
        logInfo(s"?????$filePath")
        HDFSFilesSystem.delete(filePath, true)
      }
    }
  }

  def saveAsTextFile[T <: Product](rdd: RDD[T], savePath: String) = {
    deleteFilesInHDFS(savePath)
    logInfo(s"?${savePath}?????")
    rdd.map(_.productIterator.mkString(Constants.OutputPath.SEPARATOR)).coalesce(1).saveAsTextFile(savePath)
  }

  def saveAsTextFile(text: String, savePath: String) = {
    deleteFilesInHDFS(savePath)
    logInfo(s"?${savePath}?????")
    val out = FileSystem.get(conf).create(new Path(savePath))
    out.write(text.getBytes)
    out.flush()
    out.close()
  }
} 
开发者ID:zj-lingxin,项目名称:asto-sparksql,代码行数:42,代码来源:FileUtils.scala


示例10: FrequencyMapReducer

//设置package包名称以及导入依赖的类
package com.argcv.cse8803.mapreducebasic

import com.argcv.valhalla.console.ColorForConsole._
import com.argcv.valhalla.utils.Awakable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat


object FrequencyMapReducer extends Awakable {

  def main(args: Array[String]): Unit = {
    // create a hadoop job and set main class
    val job = Job.getInstance()
    job.setJarByClass(FrequencyMapReducer.getClass)
    job.setJobName("Frequency")

    // set the input & output path
    FileInputFormat.addInputPath(job, new Path(args.head))
    FileOutputFormat.setOutputPath(job, new Path(s"${args(1)}-${System.currentTimeMillis()}"))

    // set mapper & reducer
    job.setMapperClass(FrequencyMapper.instance)
    job.setReducerClass(FrequencyReducer.instance)

    // specify the type of the output
    job.setOutputKeyClass(new Text().getClass)
    job.setOutputValueClass(new IntWritable().getClass)

    // run
    logger.info(s"job finished, status [${if (job.waitForCompletion(true)) "OK".withColor(GREEN) else "FAILED".withColor(RED)}]")
  }

} 
开发者ID:yuikns,项目名称:cse8803,代码行数:37,代码来源:FrequencyMapReducer.scala


示例11: Converter

//设置package包名称以及导入依赖的类
package com.dataoptimo.imgprocessing.convert
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.io.IOUtils
import org.apache.hadoop.io.SequenceFile
import java.io.IOException
import java.lang.IllegalArgumentException

class Converter(conf: Configuration) {
  
  def imageToSequence(srcPath: String, dstPath: String){
    try {
    val fs = FileSystem.get(conf);
    val inPath = new Path(srcPath);
    val outPath = new Path(dstPath);
    val key = new Text();
    val value = new BytesWritable();
    val in = fs.open(inPath);
    val buffer = new Array[Byte](in.available())
    in.read(buffer);
    var writer = SequenceFile.createWriter(fs, conf, outPath, key.getClass(),value.getClass());
    writer.append(new Text(inPath.getName()), new BytesWritable(buffer));
    IOUtils.closeStream(writer);
    }
    catch {
      case io: IOException => println(io.getMessage)
      case illegalArgument: IllegalArgumentException => println(illegalArgument.getMessage)
    }
    
    
    
  }
} 
开发者ID:mfawadalam,项目名称:imgprocessing,代码行数:36,代码来源:Converters.scala


示例12: HdfsWriteTest

//设置package包名称以及导入依赖的类
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.scalatest._

class HdfsWriteTest extends FlatSpec with Matchers {
  "Hello" should "have tests" in {

    def write(uri: String, filePath: String, data: Array[Byte]) = {
//      System.setProperty("HADOOP_USER_NAME", "Mariusz")
      val path = new Path(filePath)
      val conf = new Configuration()
      conf.set("fs.defaultFS", uri)
      val fs = FileSystem.get(conf)
      val os = fs.create(path)
      os.write(data)
      fs.close()
    }

    write("hdfs://0.0.0.0:19000", "hdfs://0.0.0.0:19000/user/cloudera/test.txt", "Hello World".getBytes)


  }
} 
开发者ID:ralreiroe,项目名称:embarcadero,代码行数:24,代码来源:HdfsWriteTest.scala


示例13: main

//设置package包名称以及导入依赖的类
package io.github.qf6101.topwords

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession


  def main(args: Array[String]) {
    // setup spark session
    val spark = SparkSession.builder().master("local[1]").appName(this.getClass.toString).getOrCreate()
    val inputFile = "test_data/story_of_stone.txt"
    val outputFile = "test_data/test_output"
    val files = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    if (files.exists(new Path(outputFile))) files.delete(new Path(outputFile), true)
    val corpus = spark.sparkContext.textFile(inputFile)
    new TopWORDS(
      tauL = 10,
      tauF = 5,
      textLenThld = 2000,
      useProbThld = 1E-8,
      numIterations = 10,
      convergeTol = 1E-3,
      wordBoundaryThld = 0.0)
      .run(corpus, outputFile + "/dictionary", outputFile + "/segmented_texts")
  }
} 
开发者ID:qf6101,项目名称:topwords,代码行数:26,代码来源:TestTopWORDS.scala


示例14: ConvertsSpec

//设置package包名称以及导入依赖的类
package com.newegg.eims.DataPorter.HDFS

import java.io.File

import com.newegg.eims.DataPorter.HDFS.Converts._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.security.UserGroupInformation
import org.scalatest.{FlatSpec, Matchers}


class ConvertsSpec extends FlatSpec with Matchers {
  val currentDir = new File(".").getCanonicalPath + File.separator + "target" + File.separator

  "hdfs" should "can do with hadoop user" in {
    var test = 1
    new File(currentDir + "test.txt").doAs(UserGroupInformation.createRemoteUser("vq83"), f => test = 2)
    test should be(2)
  }

  it should "no copy file when not exists file path" in {
    new File(currentDir + "test.txt").copyToHDFS(new Path(currentDir + "test1.txt"), new Configuration())
    new File(currentDir + "test1.txt").exists() shouldBe false
  }

  it should "copy file when exists file path" in {
    val f = new File(currentDir + "test.txt")
    f.createNewFile()
    f.exists() shouldBe true
    new File(currentDir + "test.txt").copyToHDFS(new Path(currentDir + "test1.txt"), new Configuration())
    val f2 = new File(currentDir + "test1.txt")
    f2.exists() shouldBe true
    f.delete() shouldBe true
    f2.delete() shouldBe true
  }
} 
开发者ID:CodeBabyBear,项目名称:DataPorter,代码行数:37,代码来源:ConvertsSpec.scala


示例15: saveParquet

//设置package包名称以及导入依赖的类
package com.newegg.eims.DataPorter.Parquet

import com.newegg.eims.DataPorter.Base._
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
import parquet.hadoop.ParquetOutputFormat
import parquet.hadoop.metadata.CompressionCodecName


    def saveParquet(path: String, hadoopConf: JobConf = new JobConf(),
                    compressionCodecName: CompressionCodecName = CompressionCodecName.SNAPPY): Path = {
      hadoopConf.set(ParquetOutputFormat.COMPRESSION, compressionCodecName.name())
      val rows = set.toDataRowSet.toRowIterator
      val schema = rows.getSchema
      val writer = ParquetFileFormat.prepareWrite(schema, new Path(path), hadoopConf)
      try {
        while (rows.hasNext) {
          val row = rows.next()
          writer.write(row)
        }
      } finally {
        writer.close()
      }
      new Path(path)
    }
  }

} 
开发者ID:CodeBabyBear,项目名称:DataPorter,代码行数:29,代码来源:Converts.scala


示例16: ParquetOutputWriter

//设置package包名称以及导入依赖的类
package com.newegg.eims.DataPorter.Parquet

import com.newegg.eims.DataPorter.Base.{DataSetSchema, IDataRow}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.{JobConf, TaskAttemptContextImpl}
import org.apache.hadoop.mapreduce.TaskAttemptContext
import parquet.hadoop.ParquetOutputFormat
import parquet.hadoop.api.WriteSupport


class ParquetOutputWriter(dataSetSchema: DataSetSchema, path: Path, conf: JobConf) {

  class IDataRowParquetOutputFormat(support: ParquetWriteSupport, filePath: Path) extends ParquetOutputFormat[IDataRow]() {
    override def getWriteSupport(configuration: Configuration): WriteSupport[IDataRow] = {
      support
    }

    override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
      filePath
    }
  }

  private val context = new TaskAttemptContextImpl(conf, new org.apache.hadoop.mapred.TaskAttemptID())
  private val formatter = {
    val support = new ParquetOutputFormat[IDataRow]().getWriteSupport(conf).asInstanceOf[ParquetWriteSupport]
    support.setSchema(dataSetSchema)
    new IDataRowParquetOutputFormat(support, path)
  }

  private val recordWriter = formatter.getRecordWriter(context)

  def write(row: IDataRow): Unit = recordWriter.write(null, row)

  def close(): Unit = recordWriter.close(context)
} 
开发者ID:CodeBabyBear,项目名称:DataPorter,代码行数:37,代码来源:ParquetOutputWriter.scala


示例17: ParquetInputReader

//设置package包名称以及导入依赖的类
package com.newegg.eims.DataPorter.Parquet

import com.newegg.eims.DataPorter.Base.DataSetSchema
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
import parquet.hadoop.{ParquetFileReader, ParquetReader}


class ParquetInputReader(path: Path, conf: JobConf) extends Iterable[ParquetDataRow] {
  ParquetFileFormat.prepare(conf)
  private val footer = ParquetFileReader.readFooter(conf, path)
  private val schema = new ParquetSchemaConverter(conf).convert(footer.getFileMetaData.getSchema)

  def getSchema: DataSetSchema = schema

  override def iterator: Iterator[ParquetDataRow] = new Iterator[ParquetDataRow] {
    private val support = new ParquetReadSupport
    support.setSchema(schema)
    private val reader = ParquetReader.builder(support, path).withConf(conf).build()
    private var current: ParquetDataRow = _
    nextRow()

    private def nextRow() = {
      current = reader.read()
      if (!hasNext) reader.close()
    }

    override def hasNext: Boolean = current != null

    override def next(): ParquetDataRow = {
      val res = current
      if (hasNext) {
        nextRow()
      }
      res
    }
  }
} 
开发者ID:CodeBabyBear,项目名称:DataPorter,代码行数:39,代码来源:ParquetInputReader.scala


示例18: SessionDataFileHDFSWriter

//设置package包名称以及导入依赖的类
package com.malaska.spark.training.streaming.dstream.sessionization

import java.io.BufferedWriter
import java.io.FileWriter
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
import java.io.OutputStreamWriter
import org.apache.hadoop.fs.Path
import java.util.Random

object SessionDataFileHDFSWriter {
  
  val eol = System.getProperty("line.separator");  
  
  def main(args: Array[String]) {
    if (args.length == 0) {
        println("SessionDataFileWriter {tempDir} {distDir} {numberOfFiles} {numberOfEventsPerFile} {waitBetweenFiles}");
        return;
    }
    val conf = new Configuration
    conf.addResource(new Path("/etc/hadoop/conf/core-site.xml"))
    conf.addResource(new Path("/etc/hadoop/conf/mapred-site.xml"))
    conf.addResource(new Path("/etc/hadoop/conf/hdfs-site.xml"))
    
    val fs = FileSystem.get(new Configuration)
    val rootTempDir = args(0)
    val rootDistDir = args(1)
    val files = args(2).toInt
    val loops = args(3).toInt
    val waitBetweenFiles = args(4).toInt
    val r = new Random
    for (f <- 1 to files) {
      val rootName = "/weblog." + System.currentTimeMillis()
      val tmpPath = new Path(rootTempDir + rootName + ".tmp")
      val writer = new BufferedWriter(new OutputStreamWriter(fs.create(tmpPath)))
      
      print(f + ": [")
      
      val randomLoops = loops + r.nextInt(loops)
      
      for (i <- 1 to randomLoops) {
        writer.write(SessionDataGenerator.getNextEvent + eol)
        if (i%100 == 0) {
          print(".")
        }
      }
      println("]")
      writer.close
      
      val distPath = new Path(rootDistDir + rootName + ".dat")
      
      fs.rename(tmpPath, distPath)
      Thread.sleep(waitBetweenFiles)
    }
    println("Done")
  }
} 
开发者ID:TedBear42,项目名称:spark_training,代码行数:58,代码来源:SessionDataFileHDFSWriter.scala


示例19: ACMEData

//设置package包名称以及导入依赖的类
package com.cloudera.datascience.cdsw.acme

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, SparkSession}

object ACMEData {

  
  def readData(): DataFrame = {
    val spark = SparkSession.builder().getOrCreate()
    import spark.implicits._
    
    // Uh oh. The data actually has an extra ID column at the front! 
    // Needs to be dropped manually
    
    val rawInput = "hdfs:///tmp/datatraining.txt"
    val csvInput = "hdfs:///tmp/datatraining.csv"
    
    val csvInputPath = new Path(csvInput)
    val fs = csvInputPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
    if (fs.exists(csvInputPath)) {
      fs.delete(csvInputPath, true)
    }
    
    spark.read.textFile(rawInput).
      map { line =>
        if (line.startsWith("\"date\"")) {
          line
        } else {
          line.substring(line.indexOf(',') + 1)
        }
      }.
      repartition(1).
      write.text(csvInput)
    
    spark.read.
      option("inferSchema", true).
      option("header", true).
      csv(csvInput).
      drop("date")
  }

} 
开发者ID:srowen,项目名称:cdsw-simple-serving,代码行数:44,代码来源:ACMEData.scala


示例20: Import

//设置package包名称以及导入依赖的类
package spark.in.space

import java.io.File
import java.net.URI

import com.amazonaws.auth.AnonymousAWSCredentials
import com.amazonaws.event.{ProgressEvent, ProgressListener}
import com.amazonaws.services.s3.transfer.TransferManager
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}



object Import {
  val log = org.slf4j.LoggerFactory.getLogger("Import")
  val bucket = sys.env("SPARK_S3_BUCKET_NAME")
  val n1gram = "ngrams/books/20090715/eng-us-all/1gram/data"
  val dlFile = "/app/googleNGram1Gram.lzo.sequenceLongString"
  val publicBucket = "datasets.elasticmapreduce"
  val parqFile = s"s3n://${bucket}${dlFile}.parquet"

  def main(args: Array[String]): Unit = {
    if(! new File(dlFile).exists()) {
      println("downloading")
      val tx = new TransferManager(new AnonymousAWSCredentials())
      val download = tx.download(publicBucket, n1gram, new File(dlFile))
      download.addProgressListener(new ProgressListener {
        override def progressChanged(progressEvent: ProgressEvent): Unit = {
          println(s"type: ${progressEvent.getEventType.name()} bytes: ${progressEvent.getBytes} transferred:${progressEvent.getBytesTransferred}")
        }
      })
      download.waitForCompletion()
    }
    val conf = new SparkConf().setAppName("spark-singularity")
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
    val theRDD = sc.sequenceFile[Long,String](s"file://$dlFile", 80)
    val theParsed = theRDD.map{ case (_, row) => rowToNGram(row) }
    val df = theParsed.toDF
    FileSystem.get(new URI(s"s3n://$bucket"), sc.hadoopConfiguration).delete(new Path(dlFile), true)
    df.write.parquet(parqFile)

  }

  def rowToNGram(in: String) : NGram = {
    val split = in.split("\t")
    NGram(split(0), split(1), split(2).toInt, split(3).toInt, split(4).toInt)
  }
}



case class NGram(ngram:String, year:String, occurrences:Int, pages:Int, books:Int) 
开发者ID:heroku,项目名称:spark-singularity,代码行数:55,代码来源:Import.scala



注:本文中的org.apache.hadoop.fs.Path类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala LocalDateTime类代码示例发布时间:2022-05-23
下一篇:
Scala InputStreamReader类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap