• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala TwitterUtils类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.spark.streaming.twitter.TwitterUtils的典型用法代码示例。如果您正苦于以下问题:Scala TwitterUtils类的具体用法?Scala TwitterUtils怎么用?Scala TwitterUtils使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了TwitterUtils类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: TwitterStreaming

//设置package包名称以及导入依赖的类
package local.riverside

import org.apache.lucene.analysis.ja.JapaneseAnalyzer
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
import org.apache.spark.SparkConf
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Durations, StreamingContext}

object TwitterStreaming {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Twitter Streaming")
    val ssc = new StreamingContext(conf, Durations.minutes(1L))

    val filter = if (args.isEmpty) Nil else args.toList
    val stream = TwitterUtils.createStream(ssc, None, filter)

    stream
      .flatMap { status =>
        val text = status.getText

        val analyzer = new JapaneseAnalyzer
        val tokenStream = analyzer.tokenStream("", text)
        val charAttr = tokenStream.addAttribute(classOf[CharTermAttribute])
     
        tokenStream.reset()

        try {
          Iterator.continually(tokenStream.incrementToken())
                  .takeWhile(identity)
                  .map(_ => charAttr.toString)
                  .toVector
        } finally {
          tokenStream.end()
          tokenStream.close()
        }
      }
      .map(word => (word, 1))
      .reduceByKey((a, b) => a + b)
      .saveAsTextFiles("output/tweet")

    ssc.start()
    ssc.awaitTermination()
  }
} 
开发者ID:ryumei,项目名称:twitter-spark-streaming-sample,代码行数:45,代码来源:TwitterStreaming.scala


示例2: VeChallengeRealTime

//设置package包名称以及导入依赖的类
package io.github.adrianulbona.ve

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Minutes, StreamingContext}
import twitter4j.Place


object VeChallengeRealTime {

  def main(args: Array[String]) {

    val spark = SparkSession.builder
      .master("local[*]")
      .appName("ve-challenge")
      .getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Minutes(2))
    val stream = TwitterUtils.createStream(ssc, None, Seq("challenge"))

    val places: DStream[Place] = stream.map(status => Option(status.getPlace))
      .filter(optionPlace => optionPlace.isDefined)
      .map(place => place.get)

    places.map(place => place.getCountryCode)
      .countByValue()
      .foreachRDD(batch => printStats(batch.sortBy({ case (_, count) => count }, ascending = false).take(5)))

    ssc.start()
    ssc.awaitTermination()

    spark.stop()
  }

  def printStats(top5Countries: Array[(String, Long)]) {
    println()
    println(new SimpleDateFormat("yyyy-MM-dd  HH:mm:ss").format(new Date()))
    top5Countries.foreach(println)
  }
} 
开发者ID:adrianulbona,项目名称:ve-challenge,代码行数:45,代码来源:VeChallengeRealTime.scala


示例3: TwitterStream

//设置package包名称以及导入依赖的类
package io.gzet.timeseries

import com.google.gson.GsonBuilder
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Minutes, StreamingContext}
import org.apache.spark.{Logging, SparkConf, SparkContext}
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder

import scala.util.Try

object TwitterStream extends SimpleConfig with Logging {

  def getTwitterStream(ssc: StreamingContext, filters: Seq[String] = Nil) = {
    val builder = new ConfigurationBuilder()
    builder.setOAuthConsumerKey(twitterApiKey)
    builder.setOAuthConsumerSecret(twitterApiSecret)
    builder.setOAuthAccessToken(twitterTokenKey)
    builder.setOAuthAccessTokenSecret(twitterTokenSecret)
    val configuration = builder.build()
    TwitterUtils.createStream(
      ssc,
      Some(new OAuthAuthorization(configuration)),
      filters,
      StorageLevel.MEMORY_ONLY
    )
  }

  def main(args: Array[String]) = {

    val sparkConf = new SparkConf().setAppName("Twitter Extractor")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Minutes(5))

    val twitterStream = getTwitterStream(ssc, args).mapPartitions({ it =>
      val gson = new GsonBuilder().create()
      it map { s =>
        Try(gson.toJson(s))
      }
    })

    twitterStream
      .filter(_.isSuccess)
      .map(_.get)
      .saveAsTextFiles("twitter")

    // Start streaming context
    ssc.start()
    ssc.awaitTermination()

  }

} 
开发者ID:PacktPublishing,项目名称:Mastering-Spark-for-Data-Science,代码行数:55,代码来源:TwitterStream.scala


示例4:

//设置package包名称以及导入依赖的类
println("")
println("*************************")
println("Tweet sample")
println("*************************")
println("")
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.log4j.{Level, Logger}

Logger.getRootLogger.setLevel(Level.ERROR)

val config = new java.util.Properties
config.load(this.getClass().getClassLoader().getResourceAsStream("config.properties"))
System.setProperty("twitter4j.oauth.consumerKey", config.get("twitter_consumerKey").toString)
System.setProperty("twitter4j.oauth.consumerSecret", config.get("twitter_consumerSecret").toString)
System.setProperty("twitter4j.oauth.accessToken", config.get("twitter_accessToken").toString)
System.setProperty("twitter4j.oauth.accessTokenSecret", config.get("twitter_accessTokenSecret").toString)


println("")
println("Create stream")
println("=============")
println("")
val filters = Array("#spark")
val ssc = new StreamingContext(sc, Seconds(3))
val stream = TwitterUtils.createStream(ssc, None, filters)

println("")
println("Extract texts and print")
println("=======================")
println("")
stream.map(status => status.getText()).print

println("")
println("Start")
println("=====")
println("")
ssc.start()
ssc.awaitTermination() 
开发者ID:dobachi,项目名称:spark-sample-scripts,代码行数:41,代码来源:Tweet.scala


示例5: TwitterSentiment

//设置package包名称以及导入依赖的类
import com.databricks.spark.corenlp.functions._
import org.apache.spark._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{current_timestamp, explode}
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter.TwitterUtils
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder

object TwitterSentiment {
  System.setProperty("spark.cassandra.connection.host", "127.0.0.1")

  def main(args:Array[String]): Unit = {
    val conf = new SparkConf().setAppName("TwitterSentiment").setMaster("local[*]")

    val spark = SparkSession.builder.config(conf).getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
    // Create a Twitter Stream for the input source.
    val auth = Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
    val twitterStream = TwitterUtils.createStream(ssc, auth, Array("trump", "donald"))

    val tweets = twitterStream
      .filter(tweet => tweet.getLang.equals("en") || tweet.getLang.equals(""))
      .map(_.getText())
      .map(_.replaceAll("/[^A-Za-z0-9 ]/", ""))
      .map(_.replaceAll("/", ""))
      .map(_.replaceAll("RT.+?(?=\\s)\\s", ""))
      .map(_.replaceAll("https([^\\s]+).*", ""))

    tweets.foreachRDD(rdd => {
      import spark.implicits._
      val df = rdd.toDF()

      val output = df.select(cleanxml('value).as('doc))
        .select(explode(ssplit('doc)).as('sen))
        .select('sen, tokenize('sen).as('words), ner('sen).as('nerTags), sentiment('sen).as('sentiment))


      val formated = output
        .drop("words")
        .drop("nerTags")
        .withColumnRenamed("sen", "tweet")
        .withColumn("processed_time", current_timestamp())

      formated
        .write
        .mode("append")
        .format("org.apache.spark.sql.cassandra")
        .options(Map("table" -> "storm_doris", "keyspace" -> "sentiment"))
        .save()

    })

    ssc.start()

    // Let's await the stream to end - forever
    ssc.awaitTermination()
  }
} 
开发者ID:jamescross91,项目名称:sentiment-analysis,代码行数:61,代码来源:TwitterSentiment.scala


示例6: Setup

//设置package包名称以及导入依赖的类
package twitter

import com.typesafe.config.ConfigFactory
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory


object Setup {

  private val log = LoggerFactory.getLogger(getClass)
  val config = ConfigFactory.load()

  log.info("Setting up Twitter credentials")
  setupTwitter(config.getString("oauth.consumerKey"), config.getString("oauth.consumerSecret"),
               config.getString("oauth.accessToken"), config.getString("oauth.accessTokenSecret"))

  log.info("Starting Spark")
  val conf = new SparkConf()
    .setMaster("local[2]")
    .setAppName("Twitter pipeline")
    .set("spark.executor.memory", "1g")
    .set("spark.rdd.compress", "true")
    .set("spark.storage.memoryFraction", "1")
    .set("spark.streaming.unpersist", "true")
    .set("spark.streaming.receiver.writeAheadLog.enable", "false")
  val sc   = new SparkContext(conf)
  val ssc  = new StreamingContext(sc, Seconds(1))

  conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  conf.registerKryoClasses(Array(classOf[Tweet]))

  def setupTwitter(consumerKey: String, consumerSecret: String, accessToken: String, accessTokenSecret: String) ={
    // Set up the system properties for twitter
    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
    System.setProperty("twitter4j.oauth.accessToken", accessToken)
    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
    // https:  all kinds of fun
    System.setProperty("twitter4j.restBaseURL", "https://api.twitter.com/1.1/")
    System.setProperty("twitter4j.streamBaseURL", "https://stream.twitter.com/1.1/")
    System.setProperty("twitter4j.siteStreamBaseURL", "https://sitestream.twitter.com/1.1/")
    System.setProperty("twitter4j.userStreamBaseURL", "https://userstream.twitter.com/1.1/")
    System.setProperty("twitter4j.oauth.requestTokenURL", "https://api.twitter.com/oauth/request_token")
    System.setProperty("twitter4j.oauth.accessTokenURL", "https://api.twitter.com/oauth/access_token")
    System.setProperty("twitter4j.oauth.authorizationURL", "https://api.twitter.com/oauth/authorize")
    System.setProperty("twitter4j.oauth.authenticationURL", "https://api.twitter.com/oauth/authenticate")
    System.setProperty("sync.numThreads", "4")
    System.setProperty("jsonStoreEnabled", "true")
  }

  def createStream = {
    log.info("Creating Twitter stream")
    TwitterUtils.createStream(ssc, None)
  }
} 
开发者ID:airtonjal,项目名称:DevCamp-2016,代码行数:58,代码来源:Setup.scala


示例7: StoreToHdfs

//设置package包名称以及导入依赖的类
package com.godatadriven.twitter_classifier

import com.google.gson.Gson
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}


object StoreToHdfs {
  private var numTweetsCollected = 0L
  private var partNum = 0
  private var gson = new Gson()

  def main(args: Array[String]) {
    // Process program arguments and set properties

    println("Initializing Streaming Spark Context...")
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(60))

    val tweetStream = TwitterUtils.createStream(ssc, None)
      .map(gson.toJson(_))

    tweetStream.foreachRDD((rdd, time) => {
      val count = rdd.count()
      if (count > 0) {
        val outputRDD = rdd.repartition(1)
        outputRDD.saveAsTextFile("/tweets/tweets_" + time.milliseconds.toString)
        numTweetsCollected += count
      }
    })

    ssc.start()
    ssc.awaitTermination()
  }
} 
开发者ID:rweverwijk,项目名称:twitter-to-neo4j,代码行数:38,代码来源:StoreToHdfs.scala


示例8: LiveTweetResourcesSparkController

//设置package包名称以及导入依赖的类
package controllers.spark.live

import javax.inject.Inject

import bigdata.engines.spark.SparkStreaming
import com.google.gson.Gson
import org.apache.spark.sql.DataFrame
import org.apache.spark.streaming.twitter.TwitterUtils
import play.api.mvc.{Action, Controller}
import services.twitter.Twitter4JConfiguration

class LiveTweetResourcesSparkController @Inject() (config: play.api.Configuration) extends Controller{
  


  def listSampleTweets = Action {
    val twitterInstance = new Twitter4JConfiguration(config).getTwitter4JAccess()
    val tweetStream = TwitterUtils.createStream(SparkStreaming.streamingContext, Option(twitterInstance.getAuthorization)).map(new Gson().toJson(_))
    var numTweetsCollected: Long = 0
    tweetStream.foreachRDD((rdd, time) => {
        val outputRDD = rdd.repartition(4)
        outputRDD.saveAsTextFile(config.getString("hadoop-tweets-url").get + "tweet_" + time.milliseconds.toString)
    })

    tweetStream.print()
    tweetStream.glom()
    SparkStreaming.streamingContext.start()
    Ok("started streaming")
  }

  def stopStreaming = Action{
    SparkStreaming.streamingContext.stop(true,true)
    Ok("stopped streaming")
  }

  private def parse(rdd: DataFrame): String = rdd.toJSON.collect.toList.mkString(",\n")
} 
开发者ID:szymonlyszkowski,项目名称:big-data-runner,代码行数:38,代码来源:LiveTweetResourcesSparkController.scala



注:本文中的org.apache.spark.streaming.twitter.TwitterUtils类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala ForeignKeyQuery类代码示例发布时间:2022-05-23
下一篇:
Scala ActionEvent类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap