• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala SaveMode类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.spark.sql.SaveMode的典型用法代码示例。如果您正苦于以下问题:Scala SaveMode类的具体用法?Scala SaveMode怎么用?Scala SaveMode使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了SaveMode类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: DatabaseBackup

//设置package包名称以及导入依赖的类
package unus.stage

import unus.helpers.Conf
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.types.StructType
import scala.reflect.runtime.universe._
import org.apache.spark.sql.SaveMode

class DatabaseBackup[T: TypeTag](table: String) {
  private lazy val schema = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType]

  def save(): Unit = {
    Conf.spark.read
      .format("jdbc")
      .option("url", Conf.dbUrl)
      .option("dbtable", s""""$table"""")
      .option("user", Conf.dbUsername)
      .option("password", Conf.dbPassword)
      .option("jdbcdriver","org.postgresql.Driver")
      .load()
      .write
      .format("csv")
      .option("header", "true")
      .save(Conf.dataDir + "/" + table + ".csv")
  }

  def load(): Unit = {
    Conf.spark.read
      .format("csv")
      .option("header", "true")
      .schema(schema)
      .load(Conf.dataDir + "/" + table + ".csv.gz")
      .write
      .format("jdbc")
      .option("url", Conf.dbUrl)
      .option("dbtable", s""""$table"""")
      .option("user", Conf.dbUsername)
      .option("password", Conf.dbPassword)
      .option("jdbcdriver","org.postgresql.Driver")
      .mode(SaveMode.Append)
      .save()
  }
} 
开发者ID:mindfulmachines,项目名称:unus,代码行数:44,代码来源:DatabaseBackup.scala


示例2: ParquetS3Backup

//设置package包名称以及导入依赖的类
package com.unity.analytics.spark.utils.parquet

import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{Logging, SparkConf, SparkContext}


object ParquetS3Backup extends Logging{
  implicit val formats = org.json4s.DefaultFormats

  def main(args: Array[String]): Unit = {
    val config = new ParquetS3BackupConfiguration(args)
    val sparkConf = new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sqlContext = new SQLContext(new SparkContext(sparkConf))
    config.merge() match {
      case true => merge(sqlContext, config.srcDir(), config.destDir(), config.numFiles())
      case false => split(sqlContext, config.srcDir(), config.destDir(), config.numFiles())
    }
  }

  
  // Reads, then merges Parquet files and writes to destDir
  def merge(sqlContext: SQLContext, srcDir: String, destDir: String, destNumFiles: Int): Unit = {
    logInfo(s"ParquetS3Backup merge - srcDir: $srcDir, destDir: $destDir, destNumFiles: $destNumFiles")
    sqlContext.read.parquet(srcDir)
      .coalesce(destNumFiles)
      .write.mode(SaveMode.Overwrite).parquet(destDir)
  }

  // Reads, then splits Parquet files and writes to destDir
  def split(sqlContext: SQLContext, srcDir: String, destDir: String, destNumFiles: Int): Unit = {
    logInfo(s"ParquetS3Backup split - srcDir: $srcDir, destDir: $destDir, destNumFiles: $destNumFiles")
    sqlContext.read.parquet(srcDir)
      .repartition(destNumFiles)
      .write.mode(SaveMode.Overwrite).parquet(destDir)
  }

  //  Reads backupMetadata and does a Backup on each srcDir to destDir, to the set number of files
  def backup(sqlContext: SQLContext, backupMetadata: BackupMetadata): Unit = {
    backupMetadata.backupEntries.foreach(backupEntry => {
      if (backupEntry.destNumFiles <= backupEntry.srcNumFiles) {
        merge(sqlContext, backupEntry.srcDir, backupEntry.destDir, backupEntry.destNumFiles)
      } else {
        split(sqlContext, backupEntry.srcDir, backupEntry.destDir, backupEntry.destNumFiles)
      }
    })
  }

  // Reads backupMetadata and restores from destDir to the srcDir, bringing back the original number of files
  def restore(sqlContext: SQLContext, backupMetadata: BackupMetadata): Unit = {
    backupMetadata.backupEntries.foreach(backupEntry => {
      if (backupEntry.srcNumFiles <= backupEntry.destNumFiles) {
        merge(sqlContext, backupEntry.destDir, backupEntry.srcDir, backupEntry.srcNumFiles)
      } else {
        split(sqlContext, backupEntry.destDir, backupEntry.srcDir, backupEntry.srcNumFiles)
      }
    })
  }
} 
开发者ID:UnityTech,项目名称:parquet-s3-backup,代码行数:59,代码来源:ParquetS3Backup.scala


示例3: TwitterEmoParser

//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts

import com.aluxian.tweeather.RichBoolean
import org.apache.spark.Logging
import org.apache.spark.sql.SaveMode


object TwitterEmoParser extends Script with Logging {

  val positiveEmoticons = TwitterEmoCollector.positiveEmoticons
  val negativeEmoticons = TwitterEmoCollector.negativeEmoticons

  override def main(args: Array[String]) {
    super.main(args)
    import sqlc.implicits._

    // Import data
    logInfo("Parsing text files")
    val data = sc.textFile("tw/sentiment/emo/*.gz")
      .coalesce(99)
      .map(_.stripPrefix("RT").trim)
      .distinct()
      .filter(!_.startsWith("Collected"))
      .filter(!_.startsWith("collected"))
      .map(text => {
        val hasPositive = positiveEmoticons.exists(text.contains)
        val hasNegative = negativeEmoticons.exists(text.contains)
        if (hasPositive ^ hasNegative) (text, hasPositive.toDouble) else null
      })
      .filter(_ != null)

    logInfo("Saving text files")
    data.toDF("raw_text", "label").write.mode(SaveMode.Overwrite)
      .parquet("tw/sentiment/emo/parsed/data.parquet")

    logInfo("Parsing finished")
    sc.stop()
  }

} 
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:41,代码来源:TwitterEmoParser.scala


示例4: StoreFormat

//设置package包名称以及导入依赖的类
package com.sasaki.utils

import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext, SaveMode}
import org.apache.spark.rdd.RDD

import com.sasaki.discretization._

object StoreFormat {

	def rdd2DF(rdd : RDD[Row], sqlContext : SQLContext) = {
    		val schema = StructType(
    			StructField("role", StringType, nullable = false) ::
    			StructField("mark", StringType, nullable = false) ::
			StructField("seqs", ArrayType(StringType), nullable = false) :: 
			Nil)
		sqlContext.createDataFrame(rdd, schema)		
	}

	def saveAsJSON(rdd : RDD[Row], 
			path : String, sqlContext : SQLContext) = {
		val df = rdd2DF(rdd, sqlContext)
		val saveOptions = Map("header" -> "false", "path" -> path)
    		df.write.format("json").mode(SaveMode.Ignore).options(saveOptions).save
	}
	
} 
开发者ID:sasakigao,项目名称:log-discrete,代码行数:28,代码来源:StoreFormat.scala


示例5: DefaultSource

//设置package包名称以及导入依赖的类
package com.rishabh.spark.datasource.s3

import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider}
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}

class DefaultSource extends RelationProvider with CreatableRelationProvider {

  override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]):
  BaseRelation = {
    val accessKey = parameters.getOrElse("accesskey", sys.error("accesskey is required"))
    val secretKey = parameters.getOrElse("secretkey", sys.error("secretkey is required"))
    val fileType = parameters.getOrElse("type", sys.error("filetype is required"))
    val path = parameters.getOrElse("path", sys.error("path is required"))
    val bucket = parameters.getOrElse("bucketName", sys.error("bucket is required"))
    new S3Relation(accessKey, secretKey, fileType, bucket, path, false)(sqlContext)
  }

  override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String,
    String], data: DataFrame): BaseRelation = {
    val accesskey = parameters.getOrElse("accesskey",sys.error("accesskey is required"))
    val secretkey = parameters.getOrElse("secretkey", sys.error("secretkey is required"))
    val bucket = parameters.getOrElse("bucketName", sys.error("bucket is required"))
    val fileType = parameters.getOrElse("type", sys.error("filetype is required"))
    val path = parameters.getOrElse("path", sys.error("path is required"))

    val supported = List("json", "parquet", "csv")
    if (!supported.contains(fileType)) {
      sys.error("fileType " + fileType + " not supported.")
    }
    val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
    hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    hadoopConf.set("fs.s3a.access.key", accesskey)
    hadoopConf.set("fs.s3a.secret.key", secretkey)
    val s3Path = "s3a://" + bucket + path
    doSave(fileType, data, s3Path)

    new S3Relation(accesskey, secretkey, fileType, bucket, path, true)(sqlContext)
  }

  private def doSave(fileType: String, dataFrame: DataFrame, path: String) = {
    fileType match {
      case "json" =>
        dataFrame.write.json(path)
      case "parquet" =>
        dataFrame.write.parquet(path)
      case "csv" =>
        dataFrame.write.format("com.databricks.spark.csv").save(path)
    }
  }
} 
开发者ID:rishabhbhardwaj,项目名称:spark-datasource-s3,代码行数:51,代码来源:DefaultSource.scala


示例6: ScorePredictor

//设置package包名称以及导入依赖的类
package org.wikimedia.research.recommendation.job.translation

import java.io.File

import org.apache.log4j.{LogManager, Logger}
import org.apache.spark.ml.regression.RandomForestRegressionModel
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}

import scala.collection.parallel.mutable.ParArray

object ScorePredictor {
  val log: Logger = LogManager.getLogger(ScorePredictor.getClass)

  def predictScores(spark: SparkSession,
                    modelsInputDir: File,
                    predictionsOutputDir: Option[File],
                    sites: ParArray[String],
                    featureData: DataFrame): Unit = {
    log.info("Scoring items")

    val predictions: Array[DataFrame] = sites.map(target => {
      try {
        log.info("Scoring for " + target)
        log.info("Getting work data for " + target)
        val workData: DataFrame = Utils.getWorkData(spark, featureData, target, exists = false)
        log.info("Loading model for " + target)
        val model = RandomForestRegressionModel.load(
          new File(modelsInputDir, target).getAbsolutePath)
        log.info("Scoring data for " + target)
        val predictions = model
          .setPredictionCol(target)
          .transform(workData)
          .select("id", target)

        predictions
      } catch {
        case unknown: Throwable =>
          log.error("Score for " + target + " failed", unknown)
          val schema = StructType(Seq(
            StructField("id", StringType, nullable = false),
            StructField(target, DoubleType, nullable = true)))
          spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)
      }
    }).toArray

    val predictedScores = predictions.reduce((left, right) => left.join(right, Seq("id"), "outer"))

    log.info("Saving predictions")
    predictionsOutputDir.foreach(f = o =>
      predictedScores.coalesce(1)
        .write
        .mode(SaveMode.ErrorIfExists)
        .option("header", value = true)
        .option("compression", "bzip2")
        .csv(new File(o, "allPredictions").getAbsolutePath))
  }
} 
开发者ID:schana,项目名称:recommendation-translation,代码行数:59,代码来源:ScorePredictor.scala


示例7: Average

//设置package包名称以及导入依赖的类
package nl.techdays.bigdataprocessing.demo03

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.sql.hive.HiveContext

case class Average(dimension: String, average: Double)

object Program {
  def main(args: Array[String]) = {
    val conf = new SparkConf().setAppName("adl-sample-app")
    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)

    import sqlContext.implicits._

    val measurements = sqlContext.sql("SELECT * FROM measurements")

    measurements
      .map(x => (x.getAs[String]("dimension"), x.getAs[Double]("value")))
      .reduceByKey((left, right) => (left + right) / 2)
      .map { case (dimension, average) => Average(dimension,average) }
      .toDF()
      .write.mode(SaveMode.Append).saveAsTable("averages")
  }
} 
开发者ID:wmeints,项目名称:techdays2016,代码行数:27,代码来源:Program.scala


示例8: DeputadosService

//设置package包名称以及导入依赖的类
package com.nakamura.camara.deputados

import com.nakamura.camara.deputados.deputado.{Deputado}
import org.apache.spark.sql.{SaveMode, SparkSession}

import scala.util.{Failure, Success, Try}
import scalaj.http.{Http, HttpResponse}


class DeputadosService(spark: SparkSession) {
  // For implicit conversions like converting RDDs to DataFrames
  import spark.implicits._

  private val logger = org.log4s.getLogger
  private val obterDeputadosEndpoint = "http://www.camara.leg.br/SitCamaraWS/Deputados.asmx/ObterDeputados"

  def getDeputados(): Try[Seq[Deputado]] = {
    logger.info(s"Sending request for deputados...")
    val response: HttpResponse[String] = Http(obterDeputadosEndpoint).postForm.asString
    if (response.isSuccess) {
      Try {
        val xml = scala.xml.XML.loadString(response.body)
        val deputadosNode = xml \\ "deputado"
        val deputados = deputadosNode.map(DeputadoUtils.fromXml)
        logger.info(s"Found ${deputados.length} deputados")
        deputados
      }
    } else {
      Failure(new Error("Request failed."))
    }
  }

  def fetchAndStoreDeputadosData(saveMode: SaveMode = SaveMode.ErrorIfExists): Unit = {
    getDeputados() match {
      case Success(deputados) =>
        val deputadossDs = deputados.toDS()
        deputadossDs.write.mode(saveMode)
          .format("json")
          .save(s"./data/deputados/")
      case Failure(err) =>
        logger.error(s"Failed to save deputados with $err")
    }
  }
} 
开发者ID:akionakamura,项目名称:camara,代码行数:45,代码来源:DeputadosService.scala


示例9: ProposicoesServiceTest

//设置package包名称以及导入依赖的类
package com.nakamura.camara.proposicoes

import com.nakamura.camara.proposicoes.proposicao.ListarProposicoesRequest
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.junit.Test
import org.scalatest.Assertions

class ProposicoesServiceTest extends Assertions {
  private val spark = SparkSession
    .builder()
    .appName("ProposicoesServiceTest")
    .master("local[*]")
    .getOrCreate()
  private val service = new ProposicoesService(spark)

  @Test
  def testListarProposicoes(): Unit = {
    val request = ListarProposicoesRequest(ano = 2017, sigla = "PEC")
    val proposicoesTry = service.listarProposicoes(request)
    assert(proposicoesTry.isSuccess)
    assert(proposicoesTry.get.nonEmpty)
  }

  @Test
  def testListarProposicoesFailure(): Unit = {
    val invalidRequest = ListarProposicoesRequest()
    val proposicoesTry = service.listarProposicoes(invalidRequest)
    assert(proposicoesTry.isFailure)
  }

  @Test
  def testListarSiglasProposicoes(): Unit = {
    val siglasTry = service.listSiglasTipoProposioes()
    assert(siglasTry.isSuccess)
    assert(siglasTry.get.nonEmpty)
  }

  @Test
  def runFetchAndStoreHistoricalData(): Unit = {
    service.fetchAndStoreHistoricalData(2010 to 2017 by 1, SaveMode.Ignore)
  }
} 
开发者ID:akionakamura,项目名称:camara,代码行数:43,代码来源:ProposicoesServiceTest.scala


示例10: DeputadosServiceTest

//设置package包名称以及导入依赖的类
package com.nakamura.camara.deputados

import org.apache.spark.sql.{SaveMode, SparkSession}
import org.junit.Test
import org.scalatest.Assertions

class DeputadosServiceTest extends Assertions {
  private val spark = SparkSession
    .builder()
    .appName("DeputadosServiceTest")
    .master("local[*]")
    .getOrCreate()
  private val service = new DeputadosService(spark)

  @Test
  def testListarSiglasProposicoes(): Unit = {
    val deputadosTry = service.getDeputados()
    assert(deputadosTry.isSuccess)
    assert(deputadosTry.get.nonEmpty)
  }
  @Test
  def testFetchAndStoreDeputadosData(): Unit = {
    val deputadosTry = service.fetchAndStoreDeputadosData(SaveMode.Ignore)
  }
} 
开发者ID:akionakamura,项目名称:camara,代码行数:26,代码来源:DeputadosServiceTest.scala


示例11: DefaultSource

//设置package包名称以及导入依赖的类
package com.springml.spark.zuora

import com.springml.spark.zuora.model.ZuoraInput
import org.apache.log4j.Logger
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types.StructType

import scala.collection.mutable


class DefaultSource extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider {
  @transient val logger = Logger.getLogger(classOf[DefaultSource])

  override def createRelation(sqlContext: SQLContext,
                              parameters: Map[String, String]): BaseRelation = {
    createRelation(sqlContext, parameters, null)
  }

  override def createRelation(sqlContext: SQLContext,
                              parameters: Map[String, String],
                              schema: StructType): BaseRelation = {
    val email = param(parameters, "email")
    val password = param(parameters, "password")
    val zoql = param(parameters, "zoql")
    val instanceUrl = parameters.getOrElse("instanceURL", "https://rest.zuora.com")
    val apiVersion = parameters.getOrElse("apiVersion", "38.0")

    // TODO
    val pageSizeParam = parameters.getOrElse("pageSize", "1000")
    val pageSize = pageSizeParam.toInt

    val zuoraInput = new ZuoraInput(email, password, zoql, instanceUrl, apiVersion, pageSize)

    val records = new ZuoraReader(zuoraInput) read()
    new DatasetRelation(records, sqlContext, schema)
  }

  override def createRelation(sqlContext: SQLContext,
                              mode: SaveMode,
                              parameters: Map[String, String],
                              data: DataFrame): BaseRelation = {
    logger.error("Save not supported by Zuora connector")
    throw new UnsupportedOperationException
  }

  private def param(parameters: Map[String, String],
                    paramName: String) : String = {
    val paramValue = parameters.getOrElse(paramName,
      sys.error(s"""'$paramName' must be specified for Spark Zuora package"""))

    if ("password".equals(paramName)) {
      logger.debug("Param " + paramName + " value " + paramValue)
    }

    paramValue
  }

} 
开发者ID:springml,项目名称:spark-zuora,代码行数:60,代码来源:DefaultSource.scala


示例12: TwitterEmoCountryParser

//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts

import com.aluxian.tweeather.RichBoolean
import org.apache.spark.Logging
import org.apache.spark.sql.SaveMode


object TwitterEmoCountryParser extends Script with Logging {

  val positiveEmoticons = TwitterEmoCollector.positiveEmoticons
  val negativeEmoticons = TwitterEmoCollector.negativeEmoticons

  override def main(args: Array[String]) {
    super.main(args)
    import sqlc.implicits._

    // Import data
    //for neutral sentiment do (hasPositive & hasNegative)
    logInfo("Parsing text files")
    val data = sc.textFile("tw/sentiment/emoByCountry/*.tar.gz")
      //.coalesce(sc.defaultParallelism)
      .coalesce(99)
      .map(_.stripPrefix("RT").trim)
      .distinct()
      .filter(!_.startsWith("Collected"))
      .filter(!_.startsWith("collected"))
      .map(text => {
        val hasPositive = positiveEmoticons.exists(text.contains)
        val hasNegative = negativeEmoticons.exists(text.contains)
        if (hasPositive ^ hasNegative) Seq(text, hasPositive.toDouble).mkString("||") else null
      })
      .filter(_ != null)
      .map(_.split("\\|\\|"))
      .map(row => (row(0), parseLong(row(1)).getOrElse(0L), row(2), parseDouble(row(3)).getOrElse(-1.0)))
      .filter(row => row._1 != -1.0) //remove rows that do not convert to 0/1 for sentiment_label

    logInfo("Saving text files")
    data.toDF("country_code", "time_stamp", "raw_text", "label").write.mode(SaveMode.Overwrite)
      .parquet("tw/sentiment/emoByCountry/parsed/data.parquet")

    logInfo("Parsing finished")
    sc.stop()
  }

  def parseLong(str: String):Option[Long] = {
    try {
      Some(str.toLong)
    } catch {
      case e: NumberFormatException => None
    }
  }

  def parseDouble(str: String):Option[Double] = {
    try {
      Some(str.toDouble)
    } catch {
      case e: NumberFormatException => None
    }
  }

} 
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:62,代码来源:TwitterEmoCountryParser.scala


示例13: Sentiment140Parser

//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts

import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SaveMode


object Sentiment140Parser extends Script with Logging {

  override def main(args: Array[String]) {
    super.main(args)

    // Import data
    val testData = sc.textFile("tw/sentiment/140/downloaded/testdata.manual.2009.06.14.csv")
    val trainingData = sc.textFile("tw/sentiment/140/downloaded/training.1600000.processed.noemoticon.csv")

    logInfo(s"Parsing test dataset")
    parse(testData, "tw/sentiment/140/parsed/test.parquet")

    logInfo(s"Parsing training dataset")
    parse(trainingData, "tw/sentiment/140/parsed/training.parquet")

    logInfo("Parsing finished")
    sc.stop()
  }

  def parse(data: RDD[String], filePath: String) {
    val parsed = data
      .filter(_.contains("\",\"")) // ensure correct format
      .map(_.split("\",\"").map(_.replace("\"", ""))) // split columns and remove " marks
      .filter(row => row.forall(_.nonEmpty)) // ensure columns are not empty
      .map(row => (row(0).toDouble, row(5))) // keep sentiment and text only
      .filter(row => row._1 != 2) // remove neutral tweets
      .map(row => (row._1 / 4, row._2)) // normalize sentiment
      .map(row => (row._2, row._1)) // switch values

    import sqlc.implicits._
    parsed.toDF("raw_text", "label").write.mode(SaveMode.Overwrite).parquet(filePath)
    logInfo(s"Parsed and saved $filePath")
  }

} 
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:43,代码来源:Sentiment140Parser.scala


示例14: Writer

//设置package包名称以及导入依赖的类
package com.github.rssanders3.spark.data_format_converter.utils

import com.github.rssanders3.spark.data_format_converter.MainArgs.JobArgs
import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}


object Writer {

  def write(sqlContext: SQLContext, inputDF: DataFrame, jobArgs: JobArgs): Unit = {
    write(sqlContext = sqlContext,
      inputDF = inputDF,
      outputDataType = jobArgs.outputDataType,
      outputFilePath = jobArgs.outputFilePath,
      outputTableName = jobArgs.outputTableName,
      saveMode = jobArgs.getSaveMode()
    )
  }

  def write(sqlContext: SQLContext, inputDF: DataFrame, outputDataType: String, outputFilePath: String, outputTableName: String, saveMode: SaveMode = SaveMode.Overwrite): Unit = {
    var outputDFWriter = inputDF.write.format(outputDataType)
    if(saveMode != null) {
      outputDFWriter = outputDFWriter.mode(saveMode)
    }

    if(outputFilePath != null) {
      outputDFWriter.save(outputFilePath)
    } else if(outputTableName != null) {
      outputDFWriter.saveAsTable(outputTableName)
    } else {
      throw new IllegalArgumentException("Output information has not been provided")
    }

  }


} 
开发者ID:rssanders3,项目名称:spark-data-format-converter,代码行数:37,代码来源:Writer.scala


示例15: WriteTest

//设置package包名称以及导入依赖的类
package com.github.rssanders3.spark.data_format_converter.utils

import java.io.File

import com.github.rssanders3.spark.data_format_converter.common.TestUtilFunctions
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{BeforeAndAfterAll, FlatSpec, GivenWhenThen, Matchers}


class WriteTest extends FlatSpec with GivenWhenThen with Matchers with BeforeAndAfterAll {

  private val MASTER = "local[2]"
  private val APP_NAME = this.getClass.getSimpleName
  private val TEST_OUTPUT_DIR = "src/test/resources/test_output/WriteTest"

  private var _sc: SparkContext = _
  private var _sqlContext: SQLContext = _

  def sc = _sc
  def sqlContext = _sqlContext

  val conf: SparkConf = new SparkConf()
    .setMaster(MASTER)
    .setAppName(APP_NAME)

  TestUtilFunctions.deleteTestOutputDirContents(TEST_OUTPUT_DIR)

  override def beforeAll(): Unit = {
    super.beforeAll()
    _sc = new SparkContext(conf)
    _sqlContext = new SQLContext(_sc)
  }

  override def afterAll(): Unit = {
    if (_sc != null) {
      _sc.stop()
      _sc = null
    }
    super.afterAll()
  }

  "Importing as text and exporting as parquet" should "work" in {
//    val inputList: java.util.List[WriteTestObject] = new java.util.ArrayList[WriteTestObject]()
//    inputList.add(new WriteTestObject("key1", "value1"))
//    val inputDF = sqlContext.createDataFrame(inputList, WriteTestObject.getClass)
    val inputDF = sqlContext.read.text("src/test/resources/text/test1.txt")
    val outputDir = TEST_OUTPUT_DIR + "/text_to_parquet"
    Writer.write(sqlContext, inputDF, "parquet", outputDir, null, SaveMode.ErrorIfExists)
    assert(new File(outputDir).exists())
  }

} 
开发者ID:rssanders3,项目名称:spark-data-format-converter,代码行数:54,代码来源:WriteTest.scala


示例16: MainTest

//设置package包名称以及导入依赖的类
package com.github.rssanders3.spark.data_format_converter

import java.io.File

import com.github.rssanders3.spark.data_format_converter.common.TestUtilFunctions
import com.github.rssanders3.spark.data_format_converter.utils.{Reader, Writer}
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{BeforeAndAfterAll, FlatSpec, GivenWhenThen, Matchers}


class MainTest extends FlatSpec with GivenWhenThen with Matchers with BeforeAndAfterAll {

  private val MASTER = "local[2]"
  private val APP_NAME = this.getClass.getSimpleName
  private val TEST_OUTPUT_DIR = "src/test/resources/test_output/MainTest"

  private var _sc: SparkContext = _
  private var _sqlContext: SQLContext = _

  def sc = _sc
  def sqlContext = _sqlContext

  TestUtilFunctions.deleteTestOutputDirContents(TEST_OUTPUT_DIR)

  val conf: SparkConf = new SparkConf()
    .setMaster(MASTER)
    .setAppName(APP_NAME)

  override def beforeAll(): Unit = {
    super.beforeAll()
    _sc = new SparkContext(conf)
    _sqlContext = new SQLContext(_sc)
  }

  override def afterAll(): Unit = {
    if (_sc != null) {
      _sc.stop()
      _sc = null
    }
    super.afterAll()
  }

  "Importing as text and exporting as parquet" should "work" in {
    val inputDF = Reader.read(sqlContext, "src/test/resources/text/test1.txt", null, "text")
    val outputDir = TEST_OUTPUT_DIR + "/text_to_parquet"
    Writer.write(sqlContext, inputDF, "parquet", outputDir, null, SaveMode.ErrorIfExists)
    assert(new File(outputDir).exists())
  }

} 
开发者ID:rssanders3,项目名称:spark-data-format-converter,代码行数:52,代码来源:MainTest.scala


示例17: OffLineMerge

//设置package包名称以及导入依赖的类
package com.paypal.risk.madmen20

import com.paypal.risk.madmen20.util.HdfsUtil
import com.paypal.risk.madmen20.util.MergeParquetMaps._
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer


object OffLineMerge {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("DataFrame-OfflineMerge").setMaster("yarn-client")
    implicit val sc = new SparkContext(conf)
    implicit val sqlc = new SQLContext(sc)

    val var_str=args(0)
    val is_Delta = var_str.split(' ').apply(0)
    val dt  = var_str.split(' ').apply(1)
    val append_path="/apps/risk/det/madmen20/bre/source=offline" + "/date=" + dt
    val store_path = "/apps/risk/det/madmen20/bre/source=offline.new" + "/date=" + dt
    val needed_sources: Array[String] = Array("base", "boost_prime", "boost_networks", "ars")
    val path_list = new ArrayBuffer[String]
    for (s <- needed_sources) {
      val file_path = "/apps/risk/det/madmen20/stage/" + is_Delta + "/source=" + s + "/date=" + dt
      path_list.append(file_path)
    }
    val ext_list = path_list.filter(path => HdfsUtil.exists(path)).toArray
    ext_list.foreach(println)
    val dfs = new Array[DataFrame](ext_list.size)
    var i = 0
    for (s <- ext_list) {
      dfs(i) = sqlc.read.parquet(s)
      i = i + 1
    }
    val final_df = mergeMultiDataFrameWithMap(dfs)

    is_Delta match {
      case "bre"   => final_df.write.mode(SaveMode.Overwrite).save(store_path)
      case "delta" => final_df.write.mode(SaveMode.Append).save(append_path)
        val Appended_DF = sqlc.read.load(append_path)
        Appended_DF.write.mode(SaveMode.Overwrite).save(store_path)
      case _ => print("Negative!")
    }
    val final_json = getSchemaDataFrameWithVars(final_df)
    val final_json_str = final_json.toString()
    HdfsUtil.write(store_path + "/_schema_info.json", final_json_str)
  }

} 
开发者ID:yanlzhang8936,项目名称:madmen20,代码行数:51,代码来源:OffLineMerge.scala


示例18: LiveAppend

//设置package包名称以及导入依赖的类
package com.paypal.risk.madmen20

import com.paypal.risk.madmen20.util.HdfsUtil
import org.apache.spark.sql.{SaveMode, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}
import com.paypal.risk.madmen20.util.MergeParquetMaps._


object LiveAppend {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("MadMEn20-LiveAppend").setMaster("yarn-client")
    implicit val sc = new SparkContext(conf)
    implicit val sqlc = new SQLContext(sc)

    val dt = args(0)

    val stagePath = "/apps/risk/det/madmen20/stage/bre/source=live/date=" + dt
    val targetPath = "/apps/risk/det/madmen20/bre/source=live/date=" + dt
    val outPutPath = targetPath + ".new"
    val input_pq = sqlc.read.parquet(stagePath)

    input_pq.write.mode(SaveMode.Append).save(targetPath)
    val base_pq = sqlc.read.parquet(targetPath)
    base_pq.write.mode(SaveMode.ErrorIfExists).save(outPutPath)

    val final_json = getSchemaDataFrameWithVars(base_pq)
    val final_json_str = final_json.toString()
    HdfsUtil.write(outPutPath + "/_schema_info.json", final_json_str)

//    val join_df = fullOuterJoin(base_pq,input_pq)
//    join_df.write.mode(SaveMode.Overwrite).save(outPutPath)
  }

} 
开发者ID:yanlzhang8936,项目名称:madmen20,代码行数:35,代码来源:LiveAppend.scala


示例19: ReStoreFile

//设置package包名称以及导入依赖的类
package com.paypal.risk.madmen20

import com.paypal.risk.madmen20.util.MergeParquetMaps._
import org.apache.spark.sql.{SaveMode, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import com.paypal.risk.madmen20.MergeScratchStage._
import com.paypal.risk.madmen20.util.HdfsUtil._


object ReStoreFile {

  def main(args: Array[String]) {
    implicit val sc = new SparkContext(new SparkConf().setAppName("DataFrame-Restore").setMaster("yarn-client"))
    //    val sc = new SparkContext(new SparkConf().setAppName("DataFrame-MergeScratchStage").setMaster("local"))
    implicit val sqlc = new SQLContext(sc)

    val orig_folder=args(0)
//    val group = var_str.split(' ').apply(0)
//    val dt  = var_str.split(' ').apply(1)
//    val tmp_golden_path = "hdfs://stampy/apps/risk/det/madmen20/golden/source=golden"
    println("orig_folder is : " + orig_folder)
    val golden_path_suffix = "hdfs://stampy/apps/risk/det/madmen20/bre/source=golden"
    val path_list = globPath(orig_folder)
    println(path_list)
    path_list.map{ path =>
      val date_path = path.split("/").filter(_.contains("date")).apply(0)
      val golden_date_path = golden_path_suffix + "/" + date_path
      val pq_file = sqlc.read.parquet(path)
      if (exists(golden_date_path)) {
        val golden_orig_pq = sqlc.read.parquet(golden_date_path)
        val join_df = mergeTwoDataFrameWithMap(pq_file, golden_orig_pq, "outer")
        join_df.write.mode(SaveMode.Overwrite).save(golden_date_path)
        path
      }
      else {
        pq_file.write.mode(SaveMode.Overwrite).save(golden_date_path)
        path
      }
    }

  }
} 
开发者ID:yanlzhang8936,项目名称:madmen20,代码行数:43,代码来源:ReStoreFile.scala


示例20: GitHubDay

//设置package包名称以及导入依赖的类
package com.example

import org.apache.spark.sql.SaveMode

import scala.io.Source.fromFile

object GitHubDay extends App {
  
  import org.apache.spark.sql.SparkSession
  
  implicit val spark: SparkSession =
    SparkSession
    .builder()
    //.appName("Github push counter")
    //.master("local[*]")
    .getOrCreate()
  import spark.implicits._
  
  val inputPath = args(0) // "../data/ch03githubarchive/*.json"
  val ghLog = spark.read.json(inputPath).cache()
  val pushes = ghLog.filter("type = 'PushEvent'").cache()
  
  pushes.printSchema()
  println("all events:" + ghLog.count)
  println("only pushes:" + pushes.count)
  pushes.show(5)
  
  val grouped = pushes.groupBy("actor.login").count.cache()
  grouped.show(5)
  val ordered = grouped.orderBy(grouped("count").desc)
  ordered.show(5)
  
  val empPath = args(1) // "../first-edition/ch03/ghEmployees.txt"
  val employees = Set() ++ (
    for {
      line <- fromFile(empPath).getLines
    } yield line.trim
  )
  val bcEmployees = spark.sparkContext.broadcast(employees)
  
  val isEmp = (user: String) => bcEmployees.value contains user
  val isEmployee = spark.udf.register("isEmpUDF", isEmp)
  
  val filtered = ordered.filter(isEmployee($"login"))
  filtered.write.mode(SaveMode.Overwrite).format(args(3)).save(args(2))
  
  spark.stop()
  
} 
开发者ID:shafiquejamal,项目名称:spark-in-action-ch03,代码行数:50,代码来源:GitHubDay.scala



注:本文中的org.apache.spark.sql.SaveMode类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Singleton类代码示例发布时间:2022-05-23
下一篇:
Scala KafkaProducer类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap