• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Logging类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.spark.Logging的典型用法代码示例。如果您正苦于以下问题:Scala Logging类的具体用法?Scala Logging怎么用?Scala Logging使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Logging类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: ParquetS3Backup

//设置package包名称以及导入依赖的类
package com.unity.analytics.spark.utils.parquet

import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{Logging, SparkConf, SparkContext}


object ParquetS3Backup extends Logging{
  implicit val formats = org.json4s.DefaultFormats

  def main(args: Array[String]): Unit = {
    val config = new ParquetS3BackupConfiguration(args)
    val sparkConf = new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sqlContext = new SQLContext(new SparkContext(sparkConf))
    config.merge() match {
      case true => merge(sqlContext, config.srcDir(), config.destDir(), config.numFiles())
      case false => split(sqlContext, config.srcDir(), config.destDir(), config.numFiles())
    }
  }

  
  // Reads, then merges Parquet files and writes to destDir
  def merge(sqlContext: SQLContext, srcDir: String, destDir: String, destNumFiles: Int): Unit = {
    logInfo(s"ParquetS3Backup merge - srcDir: $srcDir, destDir: $destDir, destNumFiles: $destNumFiles")
    sqlContext.read.parquet(srcDir)
      .coalesce(destNumFiles)
      .write.mode(SaveMode.Overwrite).parquet(destDir)
  }

  // Reads, then splits Parquet files and writes to destDir
  def split(sqlContext: SQLContext, srcDir: String, destDir: String, destNumFiles: Int): Unit = {
    logInfo(s"ParquetS3Backup split - srcDir: $srcDir, destDir: $destDir, destNumFiles: $destNumFiles")
    sqlContext.read.parquet(srcDir)
      .repartition(destNumFiles)
      .write.mode(SaveMode.Overwrite).parquet(destDir)
  }

  //  Reads backupMetadata and does a Backup on each srcDir to destDir, to the set number of files
  def backup(sqlContext: SQLContext, backupMetadata: BackupMetadata): Unit = {
    backupMetadata.backupEntries.foreach(backupEntry => {
      if (backupEntry.destNumFiles <= backupEntry.srcNumFiles) {
        merge(sqlContext, backupEntry.srcDir, backupEntry.destDir, backupEntry.destNumFiles)
      } else {
        split(sqlContext, backupEntry.srcDir, backupEntry.destDir, backupEntry.destNumFiles)
      }
    })
  }

  // Reads backupMetadata and restores from destDir to the srcDir, bringing back the original number of files
  def restore(sqlContext: SQLContext, backupMetadata: BackupMetadata): Unit = {
    backupMetadata.backupEntries.foreach(backupEntry => {
      if (backupEntry.srcNumFiles <= backupEntry.destNumFiles) {
        merge(sqlContext, backupEntry.destDir, backupEntry.srcDir, backupEntry.srcNumFiles)
      } else {
        split(sqlContext, backupEntry.destDir, backupEntry.srcDir, backupEntry.srcNumFiles)
      }
    })
  }
} 
开发者ID:UnityTech,项目名称:parquet-s3-backup,代码行数:59,代码来源:ParquetS3Backup.scala


示例2: TwitterEmoParser

//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts

import com.aluxian.tweeather.RichBoolean
import org.apache.spark.Logging
import org.apache.spark.sql.SaveMode


object TwitterEmoParser extends Script with Logging {

  val positiveEmoticons = TwitterEmoCollector.positiveEmoticons
  val negativeEmoticons = TwitterEmoCollector.negativeEmoticons

  override def main(args: Array[String]) {
    super.main(args)
    import sqlc.implicits._

    // Import data
    logInfo("Parsing text files")
    val data = sc.textFile("tw/sentiment/emo/*.gz")
      .coalesce(99)
      .map(_.stripPrefix("RT").trim)
      .distinct()
      .filter(!_.startsWith("Collected"))
      .filter(!_.startsWith("collected"))
      .map(text => {
        val hasPositive = positiveEmoticons.exists(text.contains)
        val hasNegative = negativeEmoticons.exists(text.contains)
        if (hasPositive ^ hasNegative) (text, hasPositive.toDouble) else null
      })
      .filter(_ != null)

    logInfo("Saving text files")
    data.toDF("raw_text", "label").write.mode(SaveMode.Overwrite)
      .parquet("tw/sentiment/emo/parsed/data.parquet")

    logInfo("Parsing finished")
    sc.stop()
  }

} 
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:41,代码来源:TwitterEmoParser.scala


示例3: Sentiment140Downloader

//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts

import java.net.URL
import java.util.zip.ZipInputStream

import org.apache.hadoop.fs.Path
import org.apache.spark.Logging


object Sentiment140Downloader extends Script with Logging {

  val downloadUrl = "http://cs.stanford.edu/people/alecmgo/trainingandtestdata.zip"

  override def main(args: Array[String]) {
    super.main(args)

    logInfo(s"Downloading sentiment140 dataset from $downloadUrl")
    val zip = new ZipInputStream(new URL(downloadUrl).openStream())
    val buffer = new Array[Byte](4 * 1024)

    Stream.continually(zip.getNextEntry)
      .takeWhile(_ != null)
      .foreach { entry =>
        val fileName = entry.getName
        val out = hdfs.create(new Path(s"tw/sentiment/140/downloaded/$fileName"))
        logInfo(s"Downloading $fileName")

        Stream.continually(zip.read(buffer))
          .takeWhile(_ != -1)
          .foreach { count =>
            out.write(buffer, 0, count)
          }

        out.close()
      }

    zip.close()
    logInfo("Downloading finished")
    sc.stop()
  }

} 
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:43,代码来源:Sentiment140Downloader.scala


示例4: MQAgent

//设置package包名称以及导入依赖的类
package com.asto.dmp.mgtevl.mq

import com.asto.dmp.mgtevl.base.Props
import com.rabbitmq.client.{Channel, Connection, ConnectionFactory, MessageProperties}
import org.apache.spark.Logging

object MQAgent extends Logging {
  private val connection: Connection = getConnection
  private val channel: Channel = connection.createChannel

  def getConnection = {
    val connectionFactory = new ConnectionFactory
    connectionFactory.setHost(Props.get("rabbit_mq_host"))
    connectionFactory.setPort(Props.get("rabbit_mq_port").toInt)
    connectionFactory.setUsername(Props.get("rabbit_mq_username"))
    connectionFactory.setPassword(Props.get("rabbit_mq_password"))
    connectionFactory.newConnection
  }

  def send(message: String) {
    val queueName = Props.get("queue_name_online")
    //???????????????????????
    channel.queueDeclare(queueName, true, false, false, null)
    channel.exchangeDeclare(queueName, "direct", true)
    channel.queueBind(queueName, queueName, queueName)
    channel.basicPublish(queueName, queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"))
  }

  def close() {
    if (Option(channel).isDefined) channel.close()
    if (Option(connection).isDefined) connection.close()
  }

} 
开发者ID:zj-lingxin,项目名称:MgtEvl,代码行数:35,代码来源:MQAgent.scala


示例5: FileUtils

//设置package包名称以及导入依赖的类
package com.asto.dmp.xxx.util

import com.asto.dmp.xxx.base.Constants
import com.asto.dmp.ycd.base.Constants
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD


object FileUtils extends Logging {
  private val conf = new Configuration()
  conf.set("fs.defaultFS", Constants.Hadoop.DEFAULT_FS)
  conf.set("mapreduce.jobtracker.address", Constants.Hadoop.JOBTRACKER_ADDRESS)

  def deleteFilesInHDFS(paths: String*) = {
    paths.foreach { path =>
      val filePath = new Path(path)
      val HDFSFilesSystem = filePath.getFileSystem(new Configuration())
      if (HDFSFilesSystem.exists(filePath)) {
        logInfo(s"?????$filePath")
        HDFSFilesSystem.delete(filePath, true)
      }
    }
  }

  def saveAsTextFile[T <: Product](rdd: RDD[T], savePath: String) = {
    deleteFilesInHDFS(savePath)
    logInfo(s"?${savePath}?????")
    rdd.map(_.productIterator.mkString(Constants.OutputPath.SEPARATOR)).coalesce(1).saveAsTextFile(savePath)
  }

  def saveAsTextFile(text: String, savePath: String) = {
    deleteFilesInHDFS(savePath)
    logInfo(s"?${savePath}?????")
    val out = FileSystem.get(conf).create(new Path(savePath))
    out.write(text.getBytes)
    out.flush()
    out.close()
  }
} 
开发者ID:zj-lingxin,项目名称:asto-sparksql,代码行数:42,代码来源:FileUtils.scala


示例6: LogUtils

//设置package包名称以及导入依赖的类
package org.iamShantanu101.spark.SentimentAnalyzer.utils

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SparkContext}


object LogUtils extends Logging {

  def setLogLevels(sparkContext: SparkContext) {

    sparkContext.setLogLevel(Level.WARN.toString)
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      logInfo(
        """Setting log level to [WARN] for streaming executions.
          |To override add a custom log4j.properties to the classpath.""".stripMargin)
      Logger.getRootLogger.setLevel(Level.WARN)
    }
  }
} 
开发者ID:iamShantanu101,项目名称:Twitter-Sentiment-Analyzer-Apache-Spark-Mllib,代码行数:21,代码来源:LogUtils.scala


示例7: Main

//设置package包名称以及导入依赖的类
package com.zjlp.face.spark.base

import org.apache.spark.Logging

object Main extends Logging {
  def main(args: Array[String]) {
    val beginTime = System.currentTimeMillis()
    val es = new NewES()
    val dao = new MySQL()
    if(Props.get("update_relation").toBoolean) es.loadRelationFromES
    if(Props.get("update_com_friends").toBoolean) es.loadComFriendsFromES
    dao.getOfRoster
    es.updateRelationAndComFriends()
    if (Props.get("update_ofroster").toBoolean) dao.updateOfRoster
    MySparkContext.instance().stop()
    logInfo(s"???:${(System.currentTimeMillis() - beginTime) / 1000}s")
  }
} 
开发者ID:luciuschina,项目名称:spark-relation-search,代码行数:19,代码来源:Main.scala


示例8: BackupMetadata

//设置package包名称以及导入依赖的类
package com.unity.analytics.spark.utils.parquet

import java.sql.DriverManager

import org.apache.spark.Logging
import org.json4s.jackson.Serialization


case class BackupMetadata(
                           backupId: String,
                           backupEntries: Array[BackupEntry]
                         )

case class BackupEntry(
                        srcDir: String,
                        destDir: String,
                        srcNumFiles: Int = 1,
                        destNumFiles: Int = 1
                      )

object BackupMetadata extends Logging {
  val tableName = "backup_metadata"
  implicit val formats = org.json4s.DefaultFormats

  def write(backupId: String, backupEntries: Array[BackupEntry], jdbcConfig: Map[String, String]): Unit = {
    val connection = DriverManager.getConnection(jdbcConfig.get("url").get)
    val backupEntriesJSON = Serialization.write[Array[BackupEntry]](backupEntries)
    val sql = s"""INSERT INTO ${BackupMetadata.tableName} (id, entries) VALUES ('$backupId', '$backupEntriesJSON') ON DUPLICATE KEY UPDATE entries = '$backupEntriesJSON'"""

    try {
      connection.prepareStatement(sql).execute()
    }
    finally {
      connection.close()
    }
  }

  def read(backupId: String, jdbcConfig: Map[String, String]): Option[BackupMetadata] = {
    //Read from MySQL
    val connection = DriverManager.getConnection(jdbcConfig.get("url").get)
    val sql = s"SELECT * FROM $tableName WHERE id = '$backupId'"
    try {
      val results = connection.prepareStatement(sql).executeQuery()
      while (results.next()) {
        val backupEntriesJSON = results.getString("entries")
        val backupEntries = Serialization.read[Array[BackupEntry]](backupEntriesJSON)
        return Some(BackupMetadata(backupId, backupEntries))
      }
    }
    catch {
      case e: Exception => {
        logError(s"Error loading backup BackupMetadata $backupId - ${e.getMessage}")
      }
    }
    finally {
      connection.close()
    }
    None
  }
} 
开发者ID:UnityTech,项目名称:parquet-s3-backup,代码行数:61,代码来源:BackupMetadata.scala


示例9: SessionKey

//设置package包名称以及导入依赖的类
package com.github.jparkie.spark.cassandra.client

import java.net.InetAddress

import com.datastax.spark.connector.cql.{ AuthConf, CassandraConnector }
import com.github.jparkie.spark.cassandra.conf.SparkCassServerConf
import org.apache.spark.Logging

import scala.collection.mutable

private[cassandra] trait SparkCassSSTableLoaderClientManager extends Serializable with Logging {
  case class SessionKey(
    hosts:               Set[InetAddress],
    port:                Int,
    authConf:            AuthConf,
    sparkCassServerConf: SparkCassServerConf
  ) extends Serializable

  @transient
  private[client] val internalClients = mutable.HashMap.empty[SessionKey, SparkCassSSTableLoaderClient]

  private[client] def buildSessionKey(
    cassandraConnector:  CassandraConnector,
    sparkCassServerConf: SparkCassServerConf
  ): SessionKey = {
    SessionKey(cassandraConnector.hosts, cassandraConnector.port, cassandraConnector.authConf, sparkCassServerConf)
  }

  private[client] def buildClient(
    cassandraConnector:  CassandraConnector,
    sparkCassServerConf: SparkCassServerConf
  ): SparkCassSSTableLoaderClient = {
    val newSession = cassandraConnector.openSession()

    logInfo(s"Created SSTableLoaderClient to the following Cassandra nodes: ${cassandraConnector.hosts}")

    val sparkCassSSTableLoaderClient = new SparkCassSSTableLoaderClient(newSession, sparkCassServerConf)

    sys.addShutdownHook {
      logInfo("Closed Cassandra Session for SSTableLoaderClient.")

      sparkCassSSTableLoaderClient.stop()
    }

    sparkCassSSTableLoaderClient
  }

  
  private[cassandra] def evictAll(): Unit = synchronized {
    internalClients.values.foreach(_.stop())
    internalClients.clear()
  }
}

object SparkCassSSTableLoaderClientManager extends SparkCassSSTableLoaderClientManager 
开发者ID:jparkie,项目名称:Spark2Cassandra,代码行数:56,代码来源:SparkCassSSTableLoaderClientManager.scala


示例10: Initialize

//设置package包名称以及导入依赖的类
import com.typesafe.config.Config
import org.apache.spark.{SparkContext, SparkConf, Logging}

object Initialize extends Logging {

  def getSparkContext(conf: Config): SparkContext = {
    val sparkConf = new SparkConf()
      .setMaster(conf.getString("spark.master"))
      .setAppName(conf.getString("spark.appname"))
      .set("spark.default.parallelism", conf.getString("spark.parallelism"))
      .set("spark.shuffle.service.enabled", "true")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.executor.extraJavaOptions", "-XX:+UseG1GC")
      .set("spark.io.compression.codec", "lz4")
      .set("spark.dynamicAllocation.enabled", "true")
      .set("spark.executor.instances", conf.getString("spark.executor.instances"))
      .set("yarn.scheduler.capacity.resource-calculator", "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator")
      .set("spark.driver.memory", conf.getString("spark.driver.memory"))
      .set("spark.executor.memory", conf.getString("spark.executor.memory"))

    new SparkContext(sparkConf)
  }

} 
开发者ID:dr4ke616,项目名称:music-recommender,代码行数:25,代码来源:Initialize.scala


示例11: TwitterFireRepl

//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts

import org.apache.spark.Logging
import org.apache.spark.ml.PipelineModel
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.functions._

import scala.io.Source


object TwitterFireRepl extends Script with Logging {

  override def main(args: Array[String]) {
    super.main(args)
    import sqlc.implicits._

    println("Loading fire model...")
    sc // dummy call to init the context
    val model = PipelineModel.load("/tw/fire/models/fire.model")
    println("Done. Write the input as <temperature>,<pressure>,<humidity> and press <enter>")

    for (input <- Source.stdin.getLines) {
      val t = udf { (input: String) =>
        val values = input.split(",").map(_.toDouble)
        Vectors.dense(values)
      }

      val data = sc
        .parallelize(Seq(input), 1)
        .toDF("kb_input")
        .withColumn("raw_input", t(col("kb_input")))

      model
        .transform(data)
        .show(truncate = false)
    }
  }

} 
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:40,代码来源:TwitterFireRepl.scala


示例12: TwitterEmoCountryParser

//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts

import com.aluxian.tweeather.RichBoolean
import org.apache.spark.Logging
import org.apache.spark.sql.SaveMode


object TwitterEmoCountryParser extends Script with Logging {

  val positiveEmoticons = TwitterEmoCollector.positiveEmoticons
  val negativeEmoticons = TwitterEmoCollector.negativeEmoticons

  override def main(args: Array[String]) {
    super.main(args)
    import sqlc.implicits._

    // Import data
    //for neutral sentiment do (hasPositive & hasNegative)
    logInfo("Parsing text files")
    val data = sc.textFile("tw/sentiment/emoByCountry/*.tar.gz")
      //.coalesce(sc.defaultParallelism)
      .coalesce(99)
      .map(_.stripPrefix("RT").trim)
      .distinct()
      .filter(!_.startsWith("Collected"))
      .filter(!_.startsWith("collected"))
      .map(text => {
        val hasPositive = positiveEmoticons.exists(text.contains)
        val hasNegative = negativeEmoticons.exists(text.contains)
        if (hasPositive ^ hasNegative) Seq(text, hasPositive.toDouble).mkString("||") else null
      })
      .filter(_ != null)
      .map(_.split("\\|\\|"))
      .map(row => (row(0), parseLong(row(1)).getOrElse(0L), row(2), parseDouble(row(3)).getOrElse(-1.0)))
      .filter(row => row._1 != -1.0) //remove rows that do not convert to 0/1 for sentiment_label

    logInfo("Saving text files")
    data.toDF("country_code", "time_stamp", "raw_text", "label").write.mode(SaveMode.Overwrite)
      .parquet("tw/sentiment/emoByCountry/parsed/data.parquet")

    logInfo("Parsing finished")
    sc.stop()
  }

  def parseLong(str: String):Option[Long] = {
    try {
      Some(str.toLong)
    } catch {
      case e: NumberFormatException => None
    }
  }

  def parseDouble(str: String):Option[Double] = {
    try {
      Some(str.toDouble)
    } catch {
      case e: NumberFormatException => None
    }
  }

} 
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:62,代码来源:TwitterEmoCountryParser.scala


示例13: Sentiment140Parser

//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts

import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SaveMode


object Sentiment140Parser extends Script with Logging {

  override def main(args: Array[String]) {
    super.main(args)

    // Import data
    val testData = sc.textFile("tw/sentiment/140/downloaded/testdata.manual.2009.06.14.csv")
    val trainingData = sc.textFile("tw/sentiment/140/downloaded/training.1600000.processed.noemoticon.csv")

    logInfo(s"Parsing test dataset")
    parse(testData, "tw/sentiment/140/parsed/test.parquet")

    logInfo(s"Parsing training dataset")
    parse(trainingData, "tw/sentiment/140/parsed/training.parquet")

    logInfo("Parsing finished")
    sc.stop()
  }

  def parse(data: RDD[String], filePath: String) {
    val parsed = data
      .filter(_.contains("\",\"")) // ensure correct format
      .map(_.split("\",\"").map(_.replace("\"", ""))) // split columns and remove " marks
      .filter(row => row.forall(_.nonEmpty)) // ensure columns are not empty
      .map(row => (row(0).toDouble, row(5))) // keep sentiment and text only
      .filter(row => row._1 != 2) // remove neutral tweets
      .map(row => (row._1 / 4, row._2)) // normalize sentiment
      .map(row => (row._2, row._1)) // switch values

    import sqlc.implicits._
    parsed.toDF("raw_text", "label").write.mode(SaveMode.Overwrite).parquet(filePath)
    logInfo(s"Parsed and saved $filePath")
  }

} 
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:43,代码来源:Sentiment140Parser.scala


示例14: TwitterEmoRepl

//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts

import org.apache.spark.Logging
import org.apache.spark.ml.PipelineModel

import scala.io.Source


object TwitterEmoRepl extends Script with Logging {

  override def main(args: Array[String]) {
    super.main(args)
    import sqlc.implicits._

    println("Loading emo model...")
    sc // dummy call to init the context
    val model = PipelineModel.load("/tw/sentiment/models/emo.model")
    println("Done. Write the sentence you want analysed and press <enter>")

    for (input <- Source.stdin.getLines) {
      val data = sc
        .parallelize(Seq(input), 1)
        .toDF("raw_text")

      model
        .transform(data)
        .show(truncate = false)
    }
  }

} 
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:32,代码来源:TwitterEmoRepl.scala


示例15: Sentiment140Repl

//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts

import org.apache.spark.Logging
import org.apache.spark.ml.PipelineModel

import scala.io.Source


object Sentiment140Repl extends Script with Logging {

  override def main(args: Array[String]) {
    super.main(args)
    import sqlc.implicits._

    println("Loading 140 model...")
    sc // dummy call to init the context
    val model = PipelineModel.load("tw/sentiment/models/140.model")
    println("Done. Write the sentence you want analysed and press <enter>")

    for (input <- Source.stdin.getLines) {
      val data = sc
        .parallelize(Seq(input), 1)
        .toDF("raw_text")

      model.transform(data)
        .select("probability", "prediction")
        .foreach(println)
    }
  }

} 
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:32,代码来源:Sentiment140Repl.scala


示例16: TwitterFireCollector

//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.scripts

import com.aluxian.tweeather.RichStatus
import com.aluxian.tweeather.models.{Coordinates, LocationBox}
import com.aluxian.tweeather.streaming.TwitterUtils
import org.apache.spark.Logging
import org.apache.spark.streaming.StreamingContext
import twitter4j.FilterQuery


object TwitterFireCollector extends Script with Logging {

  val locationBox = LocationBox(
    sw = Coordinates(33, -27),
    ne = Coordinates(73, 45)
  ) // Europe

  override def main(args: Array[String]) {
    super.main(args)

    val ssc = new StreamingContext(sc, streamingInterval)
    val stream = TwitterUtils.createMultiStream(ssc, queryBuilder)

    stream
      .map(status => {
        val location = status.getApproximateLocation
        val text = status.getText.replaceAll("[\\n\\r,]+", " ")
        Seq(location.lat, location.lon, status.getCreatedAt.getTime, text).mkString(",")
      })
      .repartition(sc.defaultParallelism)
      .saveAsTextFiles("/tw/fire/collected/", "text")

    ssc.start()

    if (!ssc.awaitTerminationOrTimeout(streamingTimeout)) {
      ssc.stop(stopSparkContext = true, stopGracefully = true)
    }
  }

  def queryBuilder(): FilterQuery = {
    new FilterQuery()
      .locations(locationBox.toTwitterBox: _*)
      .language("en")
  }

} 
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:47,代码来源:TwitterFireCollector.scala


示例17: SimhashIndexing

//设置package包名称以及导入依赖的类
package io.gzet.story

import java.net.URL

import com.datastax.spark.connector._
import io.gzet.story.model.Article
import io.gzet.story.util.SimhashUtils._
import io.gzet.story.util.{HtmlFetcher, Tokenizer}
import io.gzet.utils.spark.gdelt.GKGParser
import org.apache.lucene.analysis.en.EnglishAnalyzer
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}

import scala.util.Try

object SimhashIndexing extends SimpleConfig with Logging {

  def main(args: Array[String]) = {

    val sc = new SparkContext(new SparkConf().setAppName("GDELT Indexing"))

    if (args.isEmpty)
      throw new SparkException("usage: <gdeltInputDir>")

    val gdeltInputDir = args.head
    val gkgRDD = sc.textFile(gdeltInputDir)
      .map(GKGParser.toJsonGKGV2)
      .map(GKGParser.toCaseClass2)

    val urlRDD = gkgRDD.map(g => g.documentId.getOrElse("NA"))
      .filter(url => Try(new URL(url)).isSuccess)
      .distinct()
      .repartition(partitions)

    val contentRDD = urlRDD.mapPartitions({ it =>
      val html = new HtmlFetcher(gooseConnectionTimeout, gooseSocketTimeout)
      it map html.fetch
    })

    val corpusRDD = contentRDD.mapPartitions({ it =>
      val analyzer = new EnglishAnalyzer()
      it.map(content => (content, Tokenizer.lucene(content.body, analyzer)))
    }).filter({ case (content, corpus) =>
      corpus.length > minWords
    })

    //CREATE TABLE gzet.articles ( hash int PRIMARY KEY, url text, title text, body text );
    corpusRDD.mapValues(_.mkString(" ").simhash).map({ case (content, simhash) =>
      Article(simhash, content.body, content.title, content.url)
    }).saveToCassandra(cassandraKeyspace, cassandraTable)

  }

} 
开发者ID:PacktPublishing,项目名称:Mastering-Spark-for-Data-Science,代码行数:54,代码来源:SimhashIndexing.scala


示例18: StoryBatchDedup

//设置package包名称以及导入依赖的类
package io.gzet.story

import io.gzet.story.model.{Content, Article}
import org.apache.spark.graphx.{Graph, Edge}
import org.apache.spark.{Logging, SparkConf, SparkContext}
import io.gzet.story.util.SimhashUtils._
import com.datastax.spark.connector._

object StoryBatchDedup extends SimpleConfig with Logging {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("Story Extractor")
    val sc = new SparkContext(sparkConf)

    val simhashRDD = sc.cassandraTable[Article]("gzet", "articles").zipWithIndex().map({ case (a, id) =>
      ((id, Content(a.url, a.title, a.body)), a.hash)
    })
    Set(0)

    val duplicateTupleRDD = simhashRDD.flatMap({ case ((id, content), simhash) =>
      searchmasks.map({ mask =>
        (simhash ^ mask, id)
      })
    }).groupByKey()

    val edgeRDD = duplicateTupleRDD.values.flatMap({ it =>
      val list = it.toList
      for (x <- list; y <- list) yield (x, y)
    }).filter({ case (x, y) =>
      x != y
    }).distinct().map({case (x, y) =>
      Edge(x, y, 0)
    })

    val duplicateRDD = Graph.fromEdges(edgeRDD, 0L)
      .connectedComponents()
      .vertices
      .join(simhashRDD.keys)
      .values

    duplicateRDD.sortBy(_._1).collect().foreach({ case (story, content) =>
      println(story + "\t" + content.title)
    })

  }

} 
开发者ID:PacktPublishing,项目名称:Mastering-Spark-for-Data-Science,代码行数:49,代码来源:StoryBatchDedup.scala


示例19: TwitterStream

//设置package包名称以及导入依赖的类
package io.gzet.timeseries

import com.google.gson.GsonBuilder
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Minutes, StreamingContext}
import org.apache.spark.{Logging, SparkConf, SparkContext}
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder

import scala.util.Try

object TwitterStream extends SimpleConfig with Logging {

  def getTwitterStream(ssc: StreamingContext, filters: Seq[String] = Nil) = {
    val builder = new ConfigurationBuilder()
    builder.setOAuthConsumerKey(twitterApiKey)
    builder.setOAuthConsumerSecret(twitterApiSecret)
    builder.setOAuthAccessToken(twitterTokenKey)
    builder.setOAuthAccessTokenSecret(twitterTokenSecret)
    val configuration = builder.build()
    TwitterUtils.createStream(
      ssc,
      Some(new OAuthAuthorization(configuration)),
      filters,
      StorageLevel.MEMORY_ONLY
    )
  }

  def main(args: Array[String]) = {

    val sparkConf = new SparkConf().setAppName("Twitter Extractor")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Minutes(5))

    val twitterStream = getTwitterStream(ssc, args).mapPartitions({ it =>
      val gson = new GsonBuilder().create()
      it map { s =>
        Try(gson.toJson(s))
      }
    })

    twitterStream
      .filter(_.isSuccess)
      .map(_.get)
      .saveAsTextFiles("twitter")

    // Start streaming context
    ssc.start()
    ssc.awaitTermination()

  }

} 
开发者ID:PacktPublishing,项目名称:Mastering-Spark-for-Data-Science,代码行数:55,代码来源:TwitterStream.scala


示例20: LogSchema

//设置package包名称以及导入依赖的类
package com.demo.loganalyzer

import org.apache.spark.Logging
import org.apache.spark.rdd.RDD

case class LogSchema(address: String,
                     datetime: String,
                     action: Option[String]
                    )

class TransformMapper extends  Logging{
  def transform(events: RDD[LogSchema]) = {
    val e = events.map(x => (x.datetime, 1)).reduceByKey { case (x, y) => x + y }
    e.saveAsTextFile("/user/spillai/sparkop/logoutput/")
  }
}

object MapRawData extends Serializable with  Logging{
  def mapRawLine(line: String): Option[LogSchema] = {
    try {
      val fields = line.split(",", -1).map(_.trim)
      Some(
        LogSchema(
          address = fields(0),
          datetime = fields(1).substring(13, 15),
          action = if (fields(2).length > 2) Some(fields(2)) else None
        )
      )
    }
    catch {
      case e: Exception =>
        log.warn(s"Unable to parse line: $line")
        None
    }
  }
} 
开发者ID:sreejithpillai,项目名称:LogAnalyzerSparkScala,代码行数:37,代码来源:LogSchema.scala



注:本文中的org.apache.spark.Logging类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala MockFactory类代码示例发布时间:2022-05-23
下一篇:
Scala DeveloperApi类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap