本文整理汇总了Scala中org.apache.spark.sql.SaveMode类的典型用法代码示例。如果您正苦于以下问题:Scala SaveMode类的具体用法?Scala SaveMode怎么用?Scala SaveMode使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SaveMode类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: DatabaseBackup
//设置package包名称以及导入依赖的类
package unus.stage
import unus.helpers.Conf
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.types.StructType
import scala.reflect.runtime.universe._
import org.apache.spark.sql.SaveMode
class DatabaseBackup[T: TypeTag](table: String) {
private lazy val schema = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType]
def save(): Unit = {
Conf.spark.read
.format("jdbc")
.option("url", Conf.dbUrl)
.option("dbtable", s""""$table"""")
.option("user", Conf.dbUsername)
.option("password", Conf.dbPassword)
.option("jdbcdriver","org.postgresql.Driver")
.load()
.write
.format("csv")
.option("header", "true")
.save(Conf.dataDir + "/" + table + ".csv")
}
def load(): Unit = {
Conf.spark.read
.format("csv")
.option("header", "true")
.schema(schema)
.load(Conf.dataDir + "/" + table + ".csv.gz")
.write
.format("jdbc")
.option("url", Conf.dbUrl)
.option("dbtable", s""""$table"""")
.option("user", Conf.dbUsername)
.option("password", Conf.dbPassword)
.option("jdbcdriver","org.postgresql.Driver")
.mode(SaveMode.Append)
.save()
}
}
开发者ID:mindfulmachines,项目名称:unus,代码行数:44,代码来源:DatabaseBackup.scala
示例2: ParquetS3Backup
//设置package包名称以及导入依赖的类
package com.unity.analytics.spark.utils.parquet
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{Logging, SparkConf, SparkContext}
object ParquetS3Backup extends Logging{
implicit val formats = org.json4s.DefaultFormats
def main(args: Array[String]): Unit = {
val config = new ParquetS3BackupConfiguration(args)
val sparkConf = new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sqlContext = new SQLContext(new SparkContext(sparkConf))
config.merge() match {
case true => merge(sqlContext, config.srcDir(), config.destDir(), config.numFiles())
case false => split(sqlContext, config.srcDir(), config.destDir(), config.numFiles())
}
}
// Reads, then merges Parquet files and writes to destDir
def merge(sqlContext: SQLContext, srcDir: String, destDir: String, destNumFiles: Int): Unit = {
logInfo(s"ParquetS3Backup merge - srcDir: $srcDir, destDir: $destDir, destNumFiles: $destNumFiles")
sqlContext.read.parquet(srcDir)
.coalesce(destNumFiles)
.write.mode(SaveMode.Overwrite).parquet(destDir)
}
// Reads, then splits Parquet files and writes to destDir
def split(sqlContext: SQLContext, srcDir: String, destDir: String, destNumFiles: Int): Unit = {
logInfo(s"ParquetS3Backup split - srcDir: $srcDir, destDir: $destDir, destNumFiles: $destNumFiles")
sqlContext.read.parquet(srcDir)
.repartition(destNumFiles)
.write.mode(SaveMode.Overwrite).parquet(destDir)
}
// Reads backupMetadata and does a Backup on each srcDir to destDir, to the set number of files
def backup(sqlContext: SQLContext, backupMetadata: BackupMetadata): Unit = {
backupMetadata.backupEntries.foreach(backupEntry => {
if (backupEntry.destNumFiles <= backupEntry.srcNumFiles) {
merge(sqlContext, backupEntry.srcDir, backupEntry.destDir, backupEntry.destNumFiles)
} else {
split(sqlContext, backupEntry.srcDir, backupEntry.destDir, backupEntry.destNumFiles)
}
})
}
// Reads backupMetadata and restores from destDir to the srcDir, bringing back the original number of files
def restore(sqlContext: SQLContext, backupMetadata: BackupMetadata): Unit = {
backupMetadata.backupEntries.foreach(backupEntry => {
if (backupEntry.srcNumFiles <= backupEntry.destNumFiles) {
merge(sqlContext, backupEntry.destDir, backupEntry.srcDir, backupEntry.srcNumFiles)
} else {
split(sqlContext, backupEntry.destDir, backupEntry.srcDir, backupEntry.srcNumFiles)
}
})
}
}
开发者ID:UnityTech,项目名称:parquet-s3-backup,代码行数:59,代码来源:ParquetS3Backup.scala
示例3: TwitterEmoParser
//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts
import com.aluxian.tweeather.RichBoolean
import org.apache.spark.Logging
import org.apache.spark.sql.SaveMode
object TwitterEmoParser extends Script with Logging {
val positiveEmoticons = TwitterEmoCollector.positiveEmoticons
val negativeEmoticons = TwitterEmoCollector.negativeEmoticons
override def main(args: Array[String]) {
super.main(args)
import sqlc.implicits._
// Import data
logInfo("Parsing text files")
val data = sc.textFile("tw/sentiment/emo/*.gz")
.coalesce(99)
.map(_.stripPrefix("RT").trim)
.distinct()
.filter(!_.startsWith("Collected"))
.filter(!_.startsWith("collected"))
.map(text => {
val hasPositive = positiveEmoticons.exists(text.contains)
val hasNegative = negativeEmoticons.exists(text.contains)
if (hasPositive ^ hasNegative) (text, hasPositive.toDouble) else null
})
.filter(_ != null)
logInfo("Saving text files")
data.toDF("raw_text", "label").write.mode(SaveMode.Overwrite)
.parquet("tw/sentiment/emo/parsed/data.parquet")
logInfo("Parsing finished")
sc.stop()
}
}
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:41,代码来源:TwitterEmoParser.scala
示例4: StoreFormat
//设置package包名称以及导入依赖的类
package com.sasaki.utils
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext, SaveMode}
import org.apache.spark.rdd.RDD
import com.sasaki.discretization._
object StoreFormat {
def rdd2DF(rdd : RDD[Row], sqlContext : SQLContext) = {
val schema = StructType(
StructField("role", StringType, nullable = false) ::
StructField("mark", StringType, nullable = false) ::
StructField("seqs", ArrayType(StringType), nullable = false) ::
Nil)
sqlContext.createDataFrame(rdd, schema)
}
def saveAsJSON(rdd : RDD[Row],
path : String, sqlContext : SQLContext) = {
val df = rdd2DF(rdd, sqlContext)
val saveOptions = Map("header" -> "false", "path" -> path)
df.write.format("json").mode(SaveMode.Ignore).options(saveOptions).save
}
}
开发者ID:sasakigao,项目名称:log-discrete,代码行数:28,代码来源:StoreFormat.scala
示例5: DefaultSource
//设置package包名称以及导入依赖的类
package com.rishabh.spark.datasource.s3
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider}
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
class DefaultSource extends RelationProvider with CreatableRelationProvider {
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]):
BaseRelation = {
val accessKey = parameters.getOrElse("accesskey", sys.error("accesskey is required"))
val secretKey = parameters.getOrElse("secretkey", sys.error("secretkey is required"))
val fileType = parameters.getOrElse("type", sys.error("filetype is required"))
val path = parameters.getOrElse("path", sys.error("path is required"))
val bucket = parameters.getOrElse("bucketName", sys.error("bucket is required"))
new S3Relation(accessKey, secretKey, fileType, bucket, path, false)(sqlContext)
}
override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String,
String], data: DataFrame): BaseRelation = {
val accesskey = parameters.getOrElse("accesskey",sys.error("accesskey is required"))
val secretkey = parameters.getOrElse("secretkey", sys.error("secretkey is required"))
val bucket = parameters.getOrElse("bucketName", sys.error("bucket is required"))
val fileType = parameters.getOrElse("type", sys.error("filetype is required"))
val path = parameters.getOrElse("path", sys.error("path is required"))
val supported = List("json", "parquet", "csv")
if (!supported.contains(fileType)) {
sys.error("fileType " + fileType + " not supported.")
}
val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoopConf.set("fs.s3a.access.key", accesskey)
hadoopConf.set("fs.s3a.secret.key", secretkey)
val s3Path = "s3a://" + bucket + path
doSave(fileType, data, s3Path)
new S3Relation(accesskey, secretkey, fileType, bucket, path, true)(sqlContext)
}
private def doSave(fileType: String, dataFrame: DataFrame, path: String) = {
fileType match {
case "json" =>
dataFrame.write.json(path)
case "parquet" =>
dataFrame.write.parquet(path)
case "csv" =>
dataFrame.write.format("com.databricks.spark.csv").save(path)
}
}
}
开发者ID:rishabhbhardwaj,项目名称:spark-datasource-s3,代码行数:51,代码来源:DefaultSource.scala
示例6: ScorePredictor
//设置package包名称以及导入依赖的类
package org.wikimedia.research.recommendation.job.translation
import java.io.File
import org.apache.log4j.{LogManager, Logger}
import org.apache.spark.ml.regression.RandomForestRegressionModel
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import scala.collection.parallel.mutable.ParArray
object ScorePredictor {
val log: Logger = LogManager.getLogger(ScorePredictor.getClass)
def predictScores(spark: SparkSession,
modelsInputDir: File,
predictionsOutputDir: Option[File],
sites: ParArray[String],
featureData: DataFrame): Unit = {
log.info("Scoring items")
val predictions: Array[DataFrame] = sites.map(target => {
try {
log.info("Scoring for " + target)
log.info("Getting work data for " + target)
val workData: DataFrame = Utils.getWorkData(spark, featureData, target, exists = false)
log.info("Loading model for " + target)
val model = RandomForestRegressionModel.load(
new File(modelsInputDir, target).getAbsolutePath)
log.info("Scoring data for " + target)
val predictions = model
.setPredictionCol(target)
.transform(workData)
.select("id", target)
predictions
} catch {
case unknown: Throwable =>
log.error("Score for " + target + " failed", unknown)
val schema = StructType(Seq(
StructField("id", StringType, nullable = false),
StructField(target, DoubleType, nullable = true)))
spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)
}
}).toArray
val predictedScores = predictions.reduce((left, right) => left.join(right, Seq("id"), "outer"))
log.info("Saving predictions")
predictionsOutputDir.foreach(f = o =>
predictedScores.coalesce(1)
.write
.mode(SaveMode.ErrorIfExists)
.option("header", value = true)
.option("compression", "bzip2")
.csv(new File(o, "allPredictions").getAbsolutePath))
}
}
开发者ID:schana,项目名称:recommendation-translation,代码行数:59,代码来源:ScorePredictor.scala
示例7: Average
//设置package包名称以及导入依赖的类
package nl.techdays.bigdataprocessing.demo03
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.sql.hive.HiveContext
case class Average(dimension: String, average: Double)
object Program {
def main(args: Array[String]) = {
val conf = new SparkConf().setAppName("adl-sample-app")
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
val measurements = sqlContext.sql("SELECT * FROM measurements")
measurements
.map(x => (x.getAs[String]("dimension"), x.getAs[Double]("value")))
.reduceByKey((left, right) => (left + right) / 2)
.map { case (dimension, average) => Average(dimension,average) }
.toDF()
.write.mode(SaveMode.Append).saveAsTable("averages")
}
}
开发者ID:wmeints,项目名称:techdays2016,代码行数:27,代码来源:Program.scala
示例8: DeputadosService
//设置package包名称以及导入依赖的类
package com.nakamura.camara.deputados
import com.nakamura.camara.deputados.deputado.{Deputado}
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.util.{Failure, Success, Try}
import scalaj.http.{Http, HttpResponse}
class DeputadosService(spark: SparkSession) {
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
private val logger = org.log4s.getLogger
private val obterDeputadosEndpoint = "http://www.camara.leg.br/SitCamaraWS/Deputados.asmx/ObterDeputados"
def getDeputados(): Try[Seq[Deputado]] = {
logger.info(s"Sending request for deputados...")
val response: HttpResponse[String] = Http(obterDeputadosEndpoint).postForm.asString
if (response.isSuccess) {
Try {
val xml = scala.xml.XML.loadString(response.body)
val deputadosNode = xml \\ "deputado"
val deputados = deputadosNode.map(DeputadoUtils.fromXml)
logger.info(s"Found ${deputados.length} deputados")
deputados
}
} else {
Failure(new Error("Request failed."))
}
}
def fetchAndStoreDeputadosData(saveMode: SaveMode = SaveMode.ErrorIfExists): Unit = {
getDeputados() match {
case Success(deputados) =>
val deputadossDs = deputados.toDS()
deputadossDs.write.mode(saveMode)
.format("json")
.save(s"./data/deputados/")
case Failure(err) =>
logger.error(s"Failed to save deputados with $err")
}
}
}
开发者ID:akionakamura,项目名称:camara,代码行数:45,代码来源:DeputadosService.scala
示例9: ProposicoesServiceTest
//设置package包名称以及导入依赖的类
package com.nakamura.camara.proposicoes
import com.nakamura.camara.proposicoes.proposicao.ListarProposicoesRequest
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.junit.Test
import org.scalatest.Assertions
class ProposicoesServiceTest extends Assertions {
private val spark = SparkSession
.builder()
.appName("ProposicoesServiceTest")
.master("local[*]")
.getOrCreate()
private val service = new ProposicoesService(spark)
@Test
def testListarProposicoes(): Unit = {
val request = ListarProposicoesRequest(ano = 2017, sigla = "PEC")
val proposicoesTry = service.listarProposicoes(request)
assert(proposicoesTry.isSuccess)
assert(proposicoesTry.get.nonEmpty)
}
@Test
def testListarProposicoesFailure(): Unit = {
val invalidRequest = ListarProposicoesRequest()
val proposicoesTry = service.listarProposicoes(invalidRequest)
assert(proposicoesTry.isFailure)
}
@Test
def testListarSiglasProposicoes(): Unit = {
val siglasTry = service.listSiglasTipoProposioes()
assert(siglasTry.isSuccess)
assert(siglasTry.get.nonEmpty)
}
@Test
def runFetchAndStoreHistoricalData(): Unit = {
service.fetchAndStoreHistoricalData(2010 to 2017 by 1, SaveMode.Ignore)
}
}
开发者ID:akionakamura,项目名称:camara,代码行数:43,代码来源:ProposicoesServiceTest.scala
示例10: DeputadosServiceTest
//设置package包名称以及导入依赖的类
package com.nakamura.camara.deputados
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.junit.Test
import org.scalatest.Assertions
class DeputadosServiceTest extends Assertions {
private val spark = SparkSession
.builder()
.appName("DeputadosServiceTest")
.master("local[*]")
.getOrCreate()
private val service = new DeputadosService(spark)
@Test
def testListarSiglasProposicoes(): Unit = {
val deputadosTry = service.getDeputados()
assert(deputadosTry.isSuccess)
assert(deputadosTry.get.nonEmpty)
}
@Test
def testFetchAndStoreDeputadosData(): Unit = {
val deputadosTry = service.fetchAndStoreDeputadosData(SaveMode.Ignore)
}
}
开发者ID:akionakamura,项目名称:camara,代码行数:26,代码来源:DeputadosServiceTest.scala
示例11: DefaultSource
//设置package包名称以及导入依赖的类
package com.springml.spark.zuora
import com.springml.spark.zuora.model.ZuoraInput
import org.apache.log4j.Logger
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types.StructType
import scala.collection.mutable
class DefaultSource extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider {
@transient val logger = Logger.getLogger(classOf[DefaultSource])
override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
createRelation(sqlContext, parameters, null)
}
override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String],
schema: StructType): BaseRelation = {
val email = param(parameters, "email")
val password = param(parameters, "password")
val zoql = param(parameters, "zoql")
val instanceUrl = parameters.getOrElse("instanceURL", "https://rest.zuora.com")
val apiVersion = parameters.getOrElse("apiVersion", "38.0")
// TODO
val pageSizeParam = parameters.getOrElse("pageSize", "1000")
val pageSize = pageSizeParam.toInt
val zuoraInput = new ZuoraInput(email, password, zoql, instanceUrl, apiVersion, pageSize)
val records = new ZuoraReader(zuoraInput) read()
new DatasetRelation(records, sqlContext, schema)
}
override def createRelation(sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
logger.error("Save not supported by Zuora connector")
throw new UnsupportedOperationException
}
private def param(parameters: Map[String, String],
paramName: String) : String = {
val paramValue = parameters.getOrElse(paramName,
sys.error(s"""'$paramName' must be specified for Spark Zuora package"""))
if ("password".equals(paramName)) {
logger.debug("Param " + paramName + " value " + paramValue)
}
paramValue
}
}
开发者ID:springml,项目名称:spark-zuora,代码行数:60,代码来源:DefaultSource.scala
示例12: TwitterEmoCountryParser
//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts
import com.aluxian.tweeather.RichBoolean
import org.apache.spark.Logging
import org.apache.spark.sql.SaveMode
object TwitterEmoCountryParser extends Script with Logging {
val positiveEmoticons = TwitterEmoCollector.positiveEmoticons
val negativeEmoticons = TwitterEmoCollector.negativeEmoticons
override def main(args: Array[String]) {
super.main(args)
import sqlc.implicits._
// Import data
//for neutral sentiment do (hasPositive & hasNegative)
logInfo("Parsing text files")
val data = sc.textFile("tw/sentiment/emoByCountry/*.tar.gz")
//.coalesce(sc.defaultParallelism)
.coalesce(99)
.map(_.stripPrefix("RT").trim)
.distinct()
.filter(!_.startsWith("Collected"))
.filter(!_.startsWith("collected"))
.map(text => {
val hasPositive = positiveEmoticons.exists(text.contains)
val hasNegative = negativeEmoticons.exists(text.contains)
if (hasPositive ^ hasNegative) Seq(text, hasPositive.toDouble).mkString("||") else null
})
.filter(_ != null)
.map(_.split("\\|\\|"))
.map(row => (row(0), parseLong(row(1)).getOrElse(0L), row(2), parseDouble(row(3)).getOrElse(-1.0)))
.filter(row => row._1 != -1.0) //remove rows that do not convert to 0/1 for sentiment_label
logInfo("Saving text files")
data.toDF("country_code", "time_stamp", "raw_text", "label").write.mode(SaveMode.Overwrite)
.parquet("tw/sentiment/emoByCountry/parsed/data.parquet")
logInfo("Parsing finished")
sc.stop()
}
def parseLong(str: String):Option[Long] = {
try {
Some(str.toLong)
} catch {
case e: NumberFormatException => None
}
}
def parseDouble(str: String):Option[Double] = {
try {
Some(str.toDouble)
} catch {
case e: NumberFormatException => None
}
}
}
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:62,代码来源:TwitterEmoCountryParser.scala
示例13: Sentiment140Parser
//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SaveMode
object Sentiment140Parser extends Script with Logging {
override def main(args: Array[String]) {
super.main(args)
// Import data
val testData = sc.textFile("tw/sentiment/140/downloaded/testdata.manual.2009.06.14.csv")
val trainingData = sc.textFile("tw/sentiment/140/downloaded/training.1600000.processed.noemoticon.csv")
logInfo(s"Parsing test dataset")
parse(testData, "tw/sentiment/140/parsed/test.parquet")
logInfo(s"Parsing training dataset")
parse(trainingData, "tw/sentiment/140/parsed/training.parquet")
logInfo("Parsing finished")
sc.stop()
}
def parse(data: RDD[String], filePath: String) {
val parsed = data
.filter(_.contains("\",\"")) // ensure correct format
.map(_.split("\",\"").map(_.replace("\"", ""))) // split columns and remove " marks
.filter(row => row.forall(_.nonEmpty)) // ensure columns are not empty
.map(row => (row(0).toDouble, row(5))) // keep sentiment and text only
.filter(row => row._1 != 2) // remove neutral tweets
.map(row => (row._1 / 4, row._2)) // normalize sentiment
.map(row => (row._2, row._1)) // switch values
import sqlc.implicits._
parsed.toDF("raw_text", "label").write.mode(SaveMode.Overwrite).parquet(filePath)
logInfo(s"Parsed and saved $filePath")
}
}
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:43,代码来源:Sentiment140Parser.scala
示例14: Writer
//设置package包名称以及导入依赖的类
package com.github.rssanders3.spark.data_format_converter.utils
import com.github.rssanders3.spark.data_format_converter.MainArgs.JobArgs
import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
object Writer {
def write(sqlContext: SQLContext, inputDF: DataFrame, jobArgs: JobArgs): Unit = {
write(sqlContext = sqlContext,
inputDF = inputDF,
outputDataType = jobArgs.outputDataType,
outputFilePath = jobArgs.outputFilePath,
outputTableName = jobArgs.outputTableName,
saveMode = jobArgs.getSaveMode()
)
}
def write(sqlContext: SQLContext, inputDF: DataFrame, outputDataType: String, outputFilePath: String, outputTableName: String, saveMode: SaveMode = SaveMode.Overwrite): Unit = {
var outputDFWriter = inputDF.write.format(outputDataType)
if(saveMode != null) {
outputDFWriter = outputDFWriter.mode(saveMode)
}
if(outputFilePath != null) {
outputDFWriter.save(outputFilePath)
} else if(outputTableName != null) {
outputDFWriter.saveAsTable(outputTableName)
} else {
throw new IllegalArgumentException("Output information has not been provided")
}
}
}
开发者ID:rssanders3,项目名称:spark-data-format-converter,代码行数:37,代码来源:Writer.scala
示例15: WriteTest
//设置package包名称以及导入依赖的类
package com.github.rssanders3.spark.data_format_converter.utils
import java.io.File
import com.github.rssanders3.spark.data_format_converter.common.TestUtilFunctions
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{BeforeAndAfterAll, FlatSpec, GivenWhenThen, Matchers}
class WriteTest extends FlatSpec with GivenWhenThen with Matchers with BeforeAndAfterAll {
private val MASTER = "local[2]"
private val APP_NAME = this.getClass.getSimpleName
private val TEST_OUTPUT_DIR = "src/test/resources/test_output/WriteTest"
private var _sc: SparkContext = _
private var _sqlContext: SQLContext = _
def sc = _sc
def sqlContext = _sqlContext
val conf: SparkConf = new SparkConf()
.setMaster(MASTER)
.setAppName(APP_NAME)
TestUtilFunctions.deleteTestOutputDirContents(TEST_OUTPUT_DIR)
override def beforeAll(): Unit = {
super.beforeAll()
_sc = new SparkContext(conf)
_sqlContext = new SQLContext(_sc)
}
override def afterAll(): Unit = {
if (_sc != null) {
_sc.stop()
_sc = null
}
super.afterAll()
}
"Importing as text and exporting as parquet" should "work" in {
// val inputList: java.util.List[WriteTestObject] = new java.util.ArrayList[WriteTestObject]()
// inputList.add(new WriteTestObject("key1", "value1"))
// val inputDF = sqlContext.createDataFrame(inputList, WriteTestObject.getClass)
val inputDF = sqlContext.read.text("src/test/resources/text/test1.txt")
val outputDir = TEST_OUTPUT_DIR + "/text_to_parquet"
Writer.write(sqlContext, inputDF, "parquet", outputDir, null, SaveMode.ErrorIfExists)
assert(new File(outputDir).exists())
}
}
开发者ID:rssanders3,项目名称:spark-data-format-converter,代码行数:54,代码来源:WriteTest.scala
示例16: MainTest
//设置package包名称以及导入依赖的类
package com.github.rssanders3.spark.data_format_converter
import java.io.File
import com.github.rssanders3.spark.data_format_converter.common.TestUtilFunctions
import com.github.rssanders3.spark.data_format_converter.utils.{Reader, Writer}
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{BeforeAndAfterAll, FlatSpec, GivenWhenThen, Matchers}
class MainTest extends FlatSpec with GivenWhenThen with Matchers with BeforeAndAfterAll {
private val MASTER = "local[2]"
private val APP_NAME = this.getClass.getSimpleName
private val TEST_OUTPUT_DIR = "src/test/resources/test_output/MainTest"
private var _sc: SparkContext = _
private var _sqlContext: SQLContext = _
def sc = _sc
def sqlContext = _sqlContext
TestUtilFunctions.deleteTestOutputDirContents(TEST_OUTPUT_DIR)
val conf: SparkConf = new SparkConf()
.setMaster(MASTER)
.setAppName(APP_NAME)
override def beforeAll(): Unit = {
super.beforeAll()
_sc = new SparkContext(conf)
_sqlContext = new SQLContext(_sc)
}
override def afterAll(): Unit = {
if (_sc != null) {
_sc.stop()
_sc = null
}
super.afterAll()
}
"Importing as text and exporting as parquet" should "work" in {
val inputDF = Reader.read(sqlContext, "src/test/resources/text/test1.txt", null, "text")
val outputDir = TEST_OUTPUT_DIR + "/text_to_parquet"
Writer.write(sqlContext, inputDF, "parquet", outputDir, null, SaveMode.ErrorIfExists)
assert(new File(outputDir).exists())
}
}
开发者ID:rssanders3,项目名称:spark-data-format-converter,代码行数:52,代码来源:MainTest.scala
示例17: OffLineMerge
//设置package包名称以及导入依赖的类
package com.paypal.risk.madmen20
import com.paypal.risk.madmen20.util.HdfsUtil
import com.paypal.risk.madmen20.util.MergeParquetMaps._
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
object OffLineMerge {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("DataFrame-OfflineMerge").setMaster("yarn-client")
implicit val sc = new SparkContext(conf)
implicit val sqlc = new SQLContext(sc)
val var_str=args(0)
val is_Delta = var_str.split(' ').apply(0)
val dt = var_str.split(' ').apply(1)
val append_path="/apps/risk/det/madmen20/bre/source=offline" + "/date=" + dt
val store_path = "/apps/risk/det/madmen20/bre/source=offline.new" + "/date=" + dt
val needed_sources: Array[String] = Array("base", "boost_prime", "boost_networks", "ars")
val path_list = new ArrayBuffer[String]
for (s <- needed_sources) {
val file_path = "/apps/risk/det/madmen20/stage/" + is_Delta + "/source=" + s + "/date=" + dt
path_list.append(file_path)
}
val ext_list = path_list.filter(path => HdfsUtil.exists(path)).toArray
ext_list.foreach(println)
val dfs = new Array[DataFrame](ext_list.size)
var i = 0
for (s <- ext_list) {
dfs(i) = sqlc.read.parquet(s)
i = i + 1
}
val final_df = mergeMultiDataFrameWithMap(dfs)
is_Delta match {
case "bre" => final_df.write.mode(SaveMode.Overwrite).save(store_path)
case "delta" => final_df.write.mode(SaveMode.Append).save(append_path)
val Appended_DF = sqlc.read.load(append_path)
Appended_DF.write.mode(SaveMode.Overwrite).save(store_path)
case _ => print("Negative!")
}
val final_json = getSchemaDataFrameWithVars(final_df)
val final_json_str = final_json.toString()
HdfsUtil.write(store_path + "/_schema_info.json", final_json_str)
}
}
开发者ID:yanlzhang8936,项目名称:madmen20,代码行数:51,代码来源:OffLineMerge.scala
示例18: LiveAppend
//设置package包名称以及导入依赖的类
package com.paypal.risk.madmen20
import com.paypal.risk.madmen20.util.HdfsUtil
import org.apache.spark.sql.{SaveMode, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}
import com.paypal.risk.madmen20.util.MergeParquetMaps._
object LiveAppend {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("MadMEn20-LiveAppend").setMaster("yarn-client")
implicit val sc = new SparkContext(conf)
implicit val sqlc = new SQLContext(sc)
val dt = args(0)
val stagePath = "/apps/risk/det/madmen20/stage/bre/source=live/date=" + dt
val targetPath = "/apps/risk/det/madmen20/bre/source=live/date=" + dt
val outPutPath = targetPath + ".new"
val input_pq = sqlc.read.parquet(stagePath)
input_pq.write.mode(SaveMode.Append).save(targetPath)
val base_pq = sqlc.read.parquet(targetPath)
base_pq.write.mode(SaveMode.ErrorIfExists).save(outPutPath)
val final_json = getSchemaDataFrameWithVars(base_pq)
val final_json_str = final_json.toString()
HdfsUtil.write(outPutPath + "/_schema_info.json", final_json_str)
// val join_df = fullOuterJoin(base_pq,input_pq)
// join_df.write.mode(SaveMode.Overwrite).save(outPutPath)
}
}
开发者ID:yanlzhang8936,项目名称:madmen20,代码行数:35,代码来源:LiveAppend.scala
示例19: ReStoreFile
//设置package包名称以及导入依赖的类
package com.paypal.risk.madmen20
import com.paypal.risk.madmen20.util.MergeParquetMaps._
import org.apache.spark.sql.{SaveMode, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import com.paypal.risk.madmen20.MergeScratchStage._
import com.paypal.risk.madmen20.util.HdfsUtil._
object ReStoreFile {
def main(args: Array[String]) {
implicit val sc = new SparkContext(new SparkConf().setAppName("DataFrame-Restore").setMaster("yarn-client"))
// val sc = new SparkContext(new SparkConf().setAppName("DataFrame-MergeScratchStage").setMaster("local"))
implicit val sqlc = new SQLContext(sc)
val orig_folder=args(0)
// val group = var_str.split(' ').apply(0)
// val dt = var_str.split(' ').apply(1)
// val tmp_golden_path = "hdfs://stampy/apps/risk/det/madmen20/golden/source=golden"
println("orig_folder is : " + orig_folder)
val golden_path_suffix = "hdfs://stampy/apps/risk/det/madmen20/bre/source=golden"
val path_list = globPath(orig_folder)
println(path_list)
path_list.map{ path =>
val date_path = path.split("/").filter(_.contains("date")).apply(0)
val golden_date_path = golden_path_suffix + "/" + date_path
val pq_file = sqlc.read.parquet(path)
if (exists(golden_date_path)) {
val golden_orig_pq = sqlc.read.parquet(golden_date_path)
val join_df = mergeTwoDataFrameWithMap(pq_file, golden_orig_pq, "outer")
join_df.write.mode(SaveMode.Overwrite).save(golden_date_path)
path
}
else {
pq_file.write.mode(SaveMode.Overwrite).save(golden_date_path)
path
}
}
}
}
开发者ID:yanlzhang8936,项目名称:madmen20,代码行数:43,代码来源:ReStoreFile.scala
示例20: GitHubDay
//设置package包名称以及导入依赖的类
package com.example
import org.apache.spark.sql.SaveMode
import scala.io.Source.fromFile
object GitHubDay extends App {
import org.apache.spark.sql.SparkSession
implicit val spark: SparkSession =
SparkSession
.builder()
//.appName("Github push counter")
//.master("local[*]")
.getOrCreate()
import spark.implicits._
val inputPath = args(0) // "../data/ch03githubarchive/*.json"
val ghLog = spark.read.json(inputPath).cache()
val pushes = ghLog.filter("type = 'PushEvent'").cache()
pushes.printSchema()
println("all events:" + ghLog.count)
println("only pushes:" + pushes.count)
pushes.show(5)
val grouped = pushes.groupBy("actor.login").count.cache()
grouped.show(5)
val ordered = grouped.orderBy(grouped("count").desc)
ordered.show(5)
val empPath = args(1) // "../first-edition/ch03/ghEmployees.txt"
val employees = Set() ++ (
for {
line <- fromFile(empPath).getLines
} yield line.trim
)
val bcEmployees = spark.sparkContext.broadcast(employees)
val isEmp = (user: String) => bcEmployees.value contains user
val isEmployee = spark.udf.register("isEmpUDF", isEmp)
val filtered = ordered.filter(isEmployee($"login"))
filtered.write.mode(SaveMode.Overwrite).format(args(3)).save(args(2))
spark.stop()
}
开发者ID:shafiquejamal,项目名称:spark-in-action-ch03,代码行数:50,代码来源:GitHubDay.scala
注:本文中的org.apache.spark.sql.SaveMode类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论