本文整理汇总了Scala中org.apache.spark.Logging类的典型用法代码示例。如果您正苦于以下问题:Scala Logging类的具体用法?Scala Logging怎么用?Scala Logging使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Logging类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: ParquetS3Backup
//设置package包名称以及导入依赖的类
package com.unity.analytics.spark.utils.parquet
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{Logging, SparkConf, SparkContext}
object ParquetS3Backup extends Logging{
implicit val formats = org.json4s.DefaultFormats
def main(args: Array[String]): Unit = {
val config = new ParquetS3BackupConfiguration(args)
val sparkConf = new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sqlContext = new SQLContext(new SparkContext(sparkConf))
config.merge() match {
case true => merge(sqlContext, config.srcDir(), config.destDir(), config.numFiles())
case false => split(sqlContext, config.srcDir(), config.destDir(), config.numFiles())
}
}
// Reads, then merges Parquet files and writes to destDir
def merge(sqlContext: SQLContext, srcDir: String, destDir: String, destNumFiles: Int): Unit = {
logInfo(s"ParquetS3Backup merge - srcDir: $srcDir, destDir: $destDir, destNumFiles: $destNumFiles")
sqlContext.read.parquet(srcDir)
.coalesce(destNumFiles)
.write.mode(SaveMode.Overwrite).parquet(destDir)
}
// Reads, then splits Parquet files and writes to destDir
def split(sqlContext: SQLContext, srcDir: String, destDir: String, destNumFiles: Int): Unit = {
logInfo(s"ParquetS3Backup split - srcDir: $srcDir, destDir: $destDir, destNumFiles: $destNumFiles")
sqlContext.read.parquet(srcDir)
.repartition(destNumFiles)
.write.mode(SaveMode.Overwrite).parquet(destDir)
}
// Reads backupMetadata and does a Backup on each srcDir to destDir, to the set number of files
def backup(sqlContext: SQLContext, backupMetadata: BackupMetadata): Unit = {
backupMetadata.backupEntries.foreach(backupEntry => {
if (backupEntry.destNumFiles <= backupEntry.srcNumFiles) {
merge(sqlContext, backupEntry.srcDir, backupEntry.destDir, backupEntry.destNumFiles)
} else {
split(sqlContext, backupEntry.srcDir, backupEntry.destDir, backupEntry.destNumFiles)
}
})
}
// Reads backupMetadata and restores from destDir to the srcDir, bringing back the original number of files
def restore(sqlContext: SQLContext, backupMetadata: BackupMetadata): Unit = {
backupMetadata.backupEntries.foreach(backupEntry => {
if (backupEntry.srcNumFiles <= backupEntry.destNumFiles) {
merge(sqlContext, backupEntry.destDir, backupEntry.srcDir, backupEntry.srcNumFiles)
} else {
split(sqlContext, backupEntry.destDir, backupEntry.srcDir, backupEntry.srcNumFiles)
}
})
}
}
开发者ID:UnityTech,项目名称:parquet-s3-backup,代码行数:59,代码来源:ParquetS3Backup.scala
示例2: TwitterEmoParser
//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts
import com.aluxian.tweeather.RichBoolean
import org.apache.spark.Logging
import org.apache.spark.sql.SaveMode
object TwitterEmoParser extends Script with Logging {
val positiveEmoticons = TwitterEmoCollector.positiveEmoticons
val negativeEmoticons = TwitterEmoCollector.negativeEmoticons
override def main(args: Array[String]) {
super.main(args)
import sqlc.implicits._
// Import data
logInfo("Parsing text files")
val data = sc.textFile("tw/sentiment/emo/*.gz")
.coalesce(99)
.map(_.stripPrefix("RT").trim)
.distinct()
.filter(!_.startsWith("Collected"))
.filter(!_.startsWith("collected"))
.map(text => {
val hasPositive = positiveEmoticons.exists(text.contains)
val hasNegative = negativeEmoticons.exists(text.contains)
if (hasPositive ^ hasNegative) (text, hasPositive.toDouble) else null
})
.filter(_ != null)
logInfo("Saving text files")
data.toDF("raw_text", "label").write.mode(SaveMode.Overwrite)
.parquet("tw/sentiment/emo/parsed/data.parquet")
logInfo("Parsing finished")
sc.stop()
}
}
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:41,代码来源:TwitterEmoParser.scala
示例3: Sentiment140Downloader
//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts
import java.net.URL
import java.util.zip.ZipInputStream
import org.apache.hadoop.fs.Path
import org.apache.spark.Logging
object Sentiment140Downloader extends Script with Logging {
val downloadUrl = "http://cs.stanford.edu/people/alecmgo/trainingandtestdata.zip"
override def main(args: Array[String]) {
super.main(args)
logInfo(s"Downloading sentiment140 dataset from $downloadUrl")
val zip = new ZipInputStream(new URL(downloadUrl).openStream())
val buffer = new Array[Byte](4 * 1024)
Stream.continually(zip.getNextEntry)
.takeWhile(_ != null)
.foreach { entry =>
val fileName = entry.getName
val out = hdfs.create(new Path(s"tw/sentiment/140/downloaded/$fileName"))
logInfo(s"Downloading $fileName")
Stream.continually(zip.read(buffer))
.takeWhile(_ != -1)
.foreach { count =>
out.write(buffer, 0, count)
}
out.close()
}
zip.close()
logInfo("Downloading finished")
sc.stop()
}
}
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:43,代码来源:Sentiment140Downloader.scala
示例4: MQAgent
//设置package包名称以及导入依赖的类
package com.asto.dmp.mgtevl.mq
import com.asto.dmp.mgtevl.base.Props
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory, MessageProperties}
import org.apache.spark.Logging
object MQAgent extends Logging {
private val connection: Connection = getConnection
private val channel: Channel = connection.createChannel
def getConnection = {
val connectionFactory = new ConnectionFactory
connectionFactory.setHost(Props.get("rabbit_mq_host"))
connectionFactory.setPort(Props.get("rabbit_mq_port").toInt)
connectionFactory.setUsername(Props.get("rabbit_mq_username"))
connectionFactory.setPassword(Props.get("rabbit_mq_password"))
connectionFactory.newConnection
}
def send(message: String) {
val queueName = Props.get("queue_name_online")
//???????????????????????
channel.queueDeclare(queueName, true, false, false, null)
channel.exchangeDeclare(queueName, "direct", true)
channel.queueBind(queueName, queueName, queueName)
channel.basicPublish(queueName, queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"))
}
def close() {
if (Option(channel).isDefined) channel.close()
if (Option(connection).isDefined) connection.close()
}
}
开发者ID:zj-lingxin,项目名称:MgtEvl,代码行数:35,代码来源:MQAgent.scala
示例5: FileUtils
//设置package包名称以及导入依赖的类
package com.asto.dmp.xxx.util
import com.asto.dmp.xxx.base.Constants
import com.asto.dmp.ycd.base.Constants
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
object FileUtils extends Logging {
private val conf = new Configuration()
conf.set("fs.defaultFS", Constants.Hadoop.DEFAULT_FS)
conf.set("mapreduce.jobtracker.address", Constants.Hadoop.JOBTRACKER_ADDRESS)
def deleteFilesInHDFS(paths: String*) = {
paths.foreach { path =>
val filePath = new Path(path)
val HDFSFilesSystem = filePath.getFileSystem(new Configuration())
if (HDFSFilesSystem.exists(filePath)) {
logInfo(s"?????$filePath")
HDFSFilesSystem.delete(filePath, true)
}
}
}
def saveAsTextFile[T <: Product](rdd: RDD[T], savePath: String) = {
deleteFilesInHDFS(savePath)
logInfo(s"?${savePath}?????")
rdd.map(_.productIterator.mkString(Constants.OutputPath.SEPARATOR)).coalesce(1).saveAsTextFile(savePath)
}
def saveAsTextFile(text: String, savePath: String) = {
deleteFilesInHDFS(savePath)
logInfo(s"?${savePath}?????")
val out = FileSystem.get(conf).create(new Path(savePath))
out.write(text.getBytes)
out.flush()
out.close()
}
}
开发者ID:zj-lingxin,项目名称:asto-sparksql,代码行数:42,代码来源:FileUtils.scala
示例6: LogUtils
//设置package包名称以及导入依赖的类
package org.iamShantanu101.spark.SentimentAnalyzer.utils
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SparkContext}
object LogUtils extends Logging {
def setLogLevels(sparkContext: SparkContext) {
sparkContext.setLogLevel(Level.WARN.toString)
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
logInfo(
"""Setting log level to [WARN] for streaming executions.
|To override add a custom log4j.properties to the classpath.""".stripMargin)
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}
开发者ID:iamShantanu101,项目名称:Twitter-Sentiment-Analyzer-Apache-Spark-Mllib,代码行数:21,代码来源:LogUtils.scala
示例7: Main
//设置package包名称以及导入依赖的类
package com.zjlp.face.spark.base
import org.apache.spark.Logging
object Main extends Logging {
def main(args: Array[String]) {
val beginTime = System.currentTimeMillis()
val es = new NewES()
val dao = new MySQL()
if(Props.get("update_relation").toBoolean) es.loadRelationFromES
if(Props.get("update_com_friends").toBoolean) es.loadComFriendsFromES
dao.getOfRoster
es.updateRelationAndComFriends()
if (Props.get("update_ofroster").toBoolean) dao.updateOfRoster
MySparkContext.instance().stop()
logInfo(s"???:${(System.currentTimeMillis() - beginTime) / 1000}s")
}
}
开发者ID:luciuschina,项目名称:spark-relation-search,代码行数:19,代码来源:Main.scala
示例8: BackupMetadata
//设置package包名称以及导入依赖的类
package com.unity.analytics.spark.utils.parquet
import java.sql.DriverManager
import org.apache.spark.Logging
import org.json4s.jackson.Serialization
case class BackupMetadata(
backupId: String,
backupEntries: Array[BackupEntry]
)
case class BackupEntry(
srcDir: String,
destDir: String,
srcNumFiles: Int = 1,
destNumFiles: Int = 1
)
object BackupMetadata extends Logging {
val tableName = "backup_metadata"
implicit val formats = org.json4s.DefaultFormats
def write(backupId: String, backupEntries: Array[BackupEntry], jdbcConfig: Map[String, String]): Unit = {
val connection = DriverManager.getConnection(jdbcConfig.get("url").get)
val backupEntriesJSON = Serialization.write[Array[BackupEntry]](backupEntries)
val sql = s"""INSERT INTO ${BackupMetadata.tableName} (id, entries) VALUES ('$backupId', '$backupEntriesJSON') ON DUPLICATE KEY UPDATE entries = '$backupEntriesJSON'"""
try {
connection.prepareStatement(sql).execute()
}
finally {
connection.close()
}
}
def read(backupId: String, jdbcConfig: Map[String, String]): Option[BackupMetadata] = {
//Read from MySQL
val connection = DriverManager.getConnection(jdbcConfig.get("url").get)
val sql = s"SELECT * FROM $tableName WHERE id = '$backupId'"
try {
val results = connection.prepareStatement(sql).executeQuery()
while (results.next()) {
val backupEntriesJSON = results.getString("entries")
val backupEntries = Serialization.read[Array[BackupEntry]](backupEntriesJSON)
return Some(BackupMetadata(backupId, backupEntries))
}
}
catch {
case e: Exception => {
logError(s"Error loading backup BackupMetadata $backupId - ${e.getMessage}")
}
}
finally {
connection.close()
}
None
}
}
开发者ID:UnityTech,项目名称:parquet-s3-backup,代码行数:61,代码来源:BackupMetadata.scala
示例9: SessionKey
//设置package包名称以及导入依赖的类
package com.github.jparkie.spark.cassandra.client
import java.net.InetAddress
import com.datastax.spark.connector.cql.{ AuthConf, CassandraConnector }
import com.github.jparkie.spark.cassandra.conf.SparkCassServerConf
import org.apache.spark.Logging
import scala.collection.mutable
private[cassandra] trait SparkCassSSTableLoaderClientManager extends Serializable with Logging {
case class SessionKey(
hosts: Set[InetAddress],
port: Int,
authConf: AuthConf,
sparkCassServerConf: SparkCassServerConf
) extends Serializable
@transient
private[client] val internalClients = mutable.HashMap.empty[SessionKey, SparkCassSSTableLoaderClient]
private[client] def buildSessionKey(
cassandraConnector: CassandraConnector,
sparkCassServerConf: SparkCassServerConf
): SessionKey = {
SessionKey(cassandraConnector.hosts, cassandraConnector.port, cassandraConnector.authConf, sparkCassServerConf)
}
private[client] def buildClient(
cassandraConnector: CassandraConnector,
sparkCassServerConf: SparkCassServerConf
): SparkCassSSTableLoaderClient = {
val newSession = cassandraConnector.openSession()
logInfo(s"Created SSTableLoaderClient to the following Cassandra nodes: ${cassandraConnector.hosts}")
val sparkCassSSTableLoaderClient = new SparkCassSSTableLoaderClient(newSession, sparkCassServerConf)
sys.addShutdownHook {
logInfo("Closed Cassandra Session for SSTableLoaderClient.")
sparkCassSSTableLoaderClient.stop()
}
sparkCassSSTableLoaderClient
}
private[cassandra] def evictAll(): Unit = synchronized {
internalClients.values.foreach(_.stop())
internalClients.clear()
}
}
object SparkCassSSTableLoaderClientManager extends SparkCassSSTableLoaderClientManager
开发者ID:jparkie,项目名称:Spark2Cassandra,代码行数:56,代码来源:SparkCassSSTableLoaderClientManager.scala
示例10: Initialize
//设置package包名称以及导入依赖的类
import com.typesafe.config.Config
import org.apache.spark.{SparkContext, SparkConf, Logging}
object Initialize extends Logging {
def getSparkContext(conf: Config): SparkContext = {
val sparkConf = new SparkConf()
.setMaster(conf.getString("spark.master"))
.setAppName(conf.getString("spark.appname"))
.set("spark.default.parallelism", conf.getString("spark.parallelism"))
.set("spark.shuffle.service.enabled", "true")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC")
.set("spark.io.compression.codec", "lz4")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.executor.instances", conf.getString("spark.executor.instances"))
.set("yarn.scheduler.capacity.resource-calculator", "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator")
.set("spark.driver.memory", conf.getString("spark.driver.memory"))
.set("spark.executor.memory", conf.getString("spark.executor.memory"))
new SparkContext(sparkConf)
}
}
开发者ID:dr4ke616,项目名称:music-recommender,代码行数:25,代码来源:Initialize.scala
示例11: TwitterFireRepl
//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts
import org.apache.spark.Logging
import org.apache.spark.ml.PipelineModel
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.functions._
import scala.io.Source
object TwitterFireRepl extends Script with Logging {
override def main(args: Array[String]) {
super.main(args)
import sqlc.implicits._
println("Loading fire model...")
sc // dummy call to init the context
val model = PipelineModel.load("/tw/fire/models/fire.model")
println("Done. Write the input as <temperature>,<pressure>,<humidity> and press <enter>")
for (input <- Source.stdin.getLines) {
val t = udf { (input: String) =>
val values = input.split(",").map(_.toDouble)
Vectors.dense(values)
}
val data = sc
.parallelize(Seq(input), 1)
.toDF("kb_input")
.withColumn("raw_input", t(col("kb_input")))
model
.transform(data)
.show(truncate = false)
}
}
}
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:40,代码来源:TwitterFireRepl.scala
示例12: TwitterEmoCountryParser
//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts
import com.aluxian.tweeather.RichBoolean
import org.apache.spark.Logging
import org.apache.spark.sql.SaveMode
object TwitterEmoCountryParser extends Script with Logging {
val positiveEmoticons = TwitterEmoCollector.positiveEmoticons
val negativeEmoticons = TwitterEmoCollector.negativeEmoticons
override def main(args: Array[String]) {
super.main(args)
import sqlc.implicits._
// Import data
//for neutral sentiment do (hasPositive & hasNegative)
logInfo("Parsing text files")
val data = sc.textFile("tw/sentiment/emoByCountry/*.tar.gz")
//.coalesce(sc.defaultParallelism)
.coalesce(99)
.map(_.stripPrefix("RT").trim)
.distinct()
.filter(!_.startsWith("Collected"))
.filter(!_.startsWith("collected"))
.map(text => {
val hasPositive = positiveEmoticons.exists(text.contains)
val hasNegative = negativeEmoticons.exists(text.contains)
if (hasPositive ^ hasNegative) Seq(text, hasPositive.toDouble).mkString("||") else null
})
.filter(_ != null)
.map(_.split("\\|\\|"))
.map(row => (row(0), parseLong(row(1)).getOrElse(0L), row(2), parseDouble(row(3)).getOrElse(-1.0)))
.filter(row => row._1 != -1.0) //remove rows that do not convert to 0/1 for sentiment_label
logInfo("Saving text files")
data.toDF("country_code", "time_stamp", "raw_text", "label").write.mode(SaveMode.Overwrite)
.parquet("tw/sentiment/emoByCountry/parsed/data.parquet")
logInfo("Parsing finished")
sc.stop()
}
def parseLong(str: String):Option[Long] = {
try {
Some(str.toLong)
} catch {
case e: NumberFormatException => None
}
}
def parseDouble(str: String):Option[Double] = {
try {
Some(str.toDouble)
} catch {
case e: NumberFormatException => None
}
}
}
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:62,代码来源:TwitterEmoCountryParser.scala
示例13: Sentiment140Parser
//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SaveMode
object Sentiment140Parser extends Script with Logging {
override def main(args: Array[String]) {
super.main(args)
// Import data
val testData = sc.textFile("tw/sentiment/140/downloaded/testdata.manual.2009.06.14.csv")
val trainingData = sc.textFile("tw/sentiment/140/downloaded/training.1600000.processed.noemoticon.csv")
logInfo(s"Parsing test dataset")
parse(testData, "tw/sentiment/140/parsed/test.parquet")
logInfo(s"Parsing training dataset")
parse(trainingData, "tw/sentiment/140/parsed/training.parquet")
logInfo("Parsing finished")
sc.stop()
}
def parse(data: RDD[String], filePath: String) {
val parsed = data
.filter(_.contains("\",\"")) // ensure correct format
.map(_.split("\",\"").map(_.replace("\"", ""))) // split columns and remove " marks
.filter(row => row.forall(_.nonEmpty)) // ensure columns are not empty
.map(row => (row(0).toDouble, row(5))) // keep sentiment and text only
.filter(row => row._1 != 2) // remove neutral tweets
.map(row => (row._1 / 4, row._2)) // normalize sentiment
.map(row => (row._2, row._1)) // switch values
import sqlc.implicits._
parsed.toDF("raw_text", "label").write.mode(SaveMode.Overwrite).parquet(filePath)
logInfo(s"Parsed and saved $filePath")
}
}
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:43,代码来源:Sentiment140Parser.scala
示例14: TwitterEmoRepl
//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts
import org.apache.spark.Logging
import org.apache.spark.ml.PipelineModel
import scala.io.Source
object TwitterEmoRepl extends Script with Logging {
override def main(args: Array[String]) {
super.main(args)
import sqlc.implicits._
println("Loading emo model...")
sc // dummy call to init the context
val model = PipelineModel.load("/tw/sentiment/models/emo.model")
println("Done. Write the sentence you want analysed and press <enter>")
for (input <- Source.stdin.getLines) {
val data = sc
.parallelize(Seq(input), 1)
.toDF("raw_text")
model
.transform(data)
.show(truncate = false)
}
}
}
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:32,代码来源:TwitterEmoRepl.scala
示例15: Sentiment140Repl
//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts
import org.apache.spark.Logging
import org.apache.spark.ml.PipelineModel
import scala.io.Source
object Sentiment140Repl extends Script with Logging {
override def main(args: Array[String]) {
super.main(args)
import sqlc.implicits._
println("Loading 140 model...")
sc // dummy call to init the context
val model = PipelineModel.load("tw/sentiment/models/140.model")
println("Done. Write the sentence you want analysed and press <enter>")
for (input <- Source.stdin.getLines) {
val data = sc
.parallelize(Seq(input), 1)
.toDF("raw_text")
model.transform(data)
.select("probability", "prediction")
.foreach(println)
}
}
}
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:32,代码来源:Sentiment140Repl.scala
示例16: TwitterFireCollector
//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts
import com.aluxian.tweeather.RichStatus
import com.aluxian.tweeather.models.{Coordinates, LocationBox}
import com.aluxian.tweeather.streaming.TwitterUtils
import org.apache.spark.Logging
import org.apache.spark.streaming.StreamingContext
import twitter4j.FilterQuery
object TwitterFireCollector extends Script with Logging {
val locationBox = LocationBox(
sw = Coordinates(33, -27),
ne = Coordinates(73, 45)
) // Europe
override def main(args: Array[String]) {
super.main(args)
val ssc = new StreamingContext(sc, streamingInterval)
val stream = TwitterUtils.createMultiStream(ssc, queryBuilder)
stream
.map(status => {
val location = status.getApproximateLocation
val text = status.getText.replaceAll("[\\n\\r,]+", " ")
Seq(location.lat, location.lon, status.getCreatedAt.getTime, text).mkString(",")
})
.repartition(sc.defaultParallelism)
.saveAsTextFiles("/tw/fire/collected/", "text")
ssc.start()
if (!ssc.awaitTerminationOrTimeout(streamingTimeout)) {
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}
def queryBuilder(): FilterQuery = {
new FilterQuery()
.locations(locationBox.toTwitterBox: _*)
.language("en")
}
}
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:47,代码来源:TwitterFireCollector.scala
示例17: SimhashIndexing
//设置package包名称以及导入依赖的类
package io.gzet.story
import java.net.URL
import com.datastax.spark.connector._
import io.gzet.story.model.Article
import io.gzet.story.util.SimhashUtils._
import io.gzet.story.util.{HtmlFetcher, Tokenizer}
import io.gzet.utils.spark.gdelt.GKGParser
import org.apache.lucene.analysis.en.EnglishAnalyzer
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
import scala.util.Try
object SimhashIndexing extends SimpleConfig with Logging {
def main(args: Array[String]) = {
val sc = new SparkContext(new SparkConf().setAppName("GDELT Indexing"))
if (args.isEmpty)
throw new SparkException("usage: <gdeltInputDir>")
val gdeltInputDir = args.head
val gkgRDD = sc.textFile(gdeltInputDir)
.map(GKGParser.toJsonGKGV2)
.map(GKGParser.toCaseClass2)
val urlRDD = gkgRDD.map(g => g.documentId.getOrElse("NA"))
.filter(url => Try(new URL(url)).isSuccess)
.distinct()
.repartition(partitions)
val contentRDD = urlRDD.mapPartitions({ it =>
val html = new HtmlFetcher(gooseConnectionTimeout, gooseSocketTimeout)
it map html.fetch
})
val corpusRDD = contentRDD.mapPartitions({ it =>
val analyzer = new EnglishAnalyzer()
it.map(content => (content, Tokenizer.lucene(content.body, analyzer)))
}).filter({ case (content, corpus) =>
corpus.length > minWords
})
//CREATE TABLE gzet.articles ( hash int PRIMARY KEY, url text, title text, body text );
corpusRDD.mapValues(_.mkString(" ").simhash).map({ case (content, simhash) =>
Article(simhash, content.body, content.title, content.url)
}).saveToCassandra(cassandraKeyspace, cassandraTable)
}
}
开发者ID:PacktPublishing,项目名称:Mastering-Spark-for-Data-Science,代码行数:54,代码来源:SimhashIndexing.scala
示例18: StoryBatchDedup
//设置package包名称以及导入依赖的类
package io.gzet.story
import io.gzet.story.model.{Content, Article}
import org.apache.spark.graphx.{Graph, Edge}
import org.apache.spark.{Logging, SparkConf, SparkContext}
import io.gzet.story.util.SimhashUtils._
import com.datastax.spark.connector._
object StoryBatchDedup extends SimpleConfig with Logging {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("Story Extractor")
val sc = new SparkContext(sparkConf)
val simhashRDD = sc.cassandraTable[Article]("gzet", "articles").zipWithIndex().map({ case (a, id) =>
((id, Content(a.url, a.title, a.body)), a.hash)
})
Set(0)
val duplicateTupleRDD = simhashRDD.flatMap({ case ((id, content), simhash) =>
searchmasks.map({ mask =>
(simhash ^ mask, id)
})
}).groupByKey()
val edgeRDD = duplicateTupleRDD.values.flatMap({ it =>
val list = it.toList
for (x <- list; y <- list) yield (x, y)
}).filter({ case (x, y) =>
x != y
}).distinct().map({case (x, y) =>
Edge(x, y, 0)
})
val duplicateRDD = Graph.fromEdges(edgeRDD, 0L)
.connectedComponents()
.vertices
.join(simhashRDD.keys)
.values
duplicateRDD.sortBy(_._1).collect().foreach({ case (story, content) =>
println(story + "\t" + content.title)
})
}
}
开发者ID:PacktPublishing,项目名称:Mastering-Spark-for-Data-Science,代码行数:49,代码来源:StoryBatchDedup.scala
示例19: TwitterStream
//设置package包名称以及导入依赖的类
package io.gzet.timeseries
import com.google.gson.GsonBuilder
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Minutes, StreamingContext}
import org.apache.spark.{Logging, SparkConf, SparkContext}
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
import scala.util.Try
object TwitterStream extends SimpleConfig with Logging {
def getTwitterStream(ssc: StreamingContext, filters: Seq[String] = Nil) = {
val builder = new ConfigurationBuilder()
builder.setOAuthConsumerKey(twitterApiKey)
builder.setOAuthConsumerSecret(twitterApiSecret)
builder.setOAuthAccessToken(twitterTokenKey)
builder.setOAuthAccessTokenSecret(twitterTokenSecret)
val configuration = builder.build()
TwitterUtils.createStream(
ssc,
Some(new OAuthAuthorization(configuration)),
filters,
StorageLevel.MEMORY_ONLY
)
}
def main(args: Array[String]) = {
val sparkConf = new SparkConf().setAppName("Twitter Extractor")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Minutes(5))
val twitterStream = getTwitterStream(ssc, args).mapPartitions({ it =>
val gson = new GsonBuilder().create()
it map { s =>
Try(gson.toJson(s))
}
})
twitterStream
.filter(_.isSuccess)
.map(_.get)
.saveAsTextFiles("twitter")
// Start streaming context
ssc.start()
ssc.awaitTermination()
}
}
开发者ID:PacktPublishing,项目名称:Mastering-Spark-for-Data-Science,代码行数:55,代码来源:TwitterStream.scala
示例20: LogSchema
//设置package包名称以及导入依赖的类
package com.demo.loganalyzer
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
case class LogSchema(address: String,
datetime: String,
action: Option[String]
)
class TransformMapper extends Logging{
def transform(events: RDD[LogSchema]) = {
val e = events.map(x => (x.datetime, 1)).reduceByKey { case (x, y) => x + y }
e.saveAsTextFile("/user/spillai/sparkop/logoutput/")
}
}
object MapRawData extends Serializable with Logging{
def mapRawLine(line: String): Option[LogSchema] = {
try {
val fields = line.split(",", -1).map(_.trim)
Some(
LogSchema(
address = fields(0),
datetime = fields(1).substring(13, 15),
action = if (fields(2).length > 2) Some(fields(2)) else None
)
)
}
catch {
case e: Exception =>
log.warn(s"Unable to parse line: $line")
None
}
}
}
开发者ID:sreejithpillai,项目名称:LogAnalyzerSparkScala,代码行数:37,代码来源:LogSchema.scala
注:本文中的org.apache.spark.Logging类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论