• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala StreamingContext类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.spark.streaming.StreamingContext的典型用法代码示例。如果您正苦于以下问题:Scala StreamingContext类的具体用法?Scala StreamingContext怎么用?Scala StreamingContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了StreamingContext类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: KafkaUtility

//设置package包名称以及导入依赖的类
package com.knoldus.streaming.kafka

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}


object KafkaUtility {

  //TODO It should read from config
  private val kafkaParams = Map(
    "bootstrap.servers" -> "localhost:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "auto.offset.reset" -> "earliest",
    "group.id" -> "tweet-consumer"
  )

  private val preferredHosts = LocationStrategies.PreferConsistent


  def createDStreamFromKafka(ssc: StreamingContext, topics: List[String]): InputDStream[ConsumerRecord[String, String]] =
    KafkaUtils.createDirectStream[String, String](
      ssc,
      preferredHosts,
      ConsumerStrategies.Subscribe[String, String](topics.distinct, kafkaParams)
    )

} 
开发者ID:knoldus,项目名称:real-time-stream-processing-engine,代码行数:32,代码来源:KafkaUtility.scala


示例2: KMeansClusteringApp

//设置package包名称以及导入依赖的类
package org.apress.prospark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD.doubleRDDToDoubleRDDFunctions
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext

object KMeansClusteringApp {

  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println(
        "Usage: KMeansClusteringApp <appname> <batchInterval> <hostname> <port>")
      System.exit(1)
    }
    val Seq(appName, batchInterval, hostname, port) = args.toSeq

    val conf = new SparkConf()
      .setAppName(appName)
      .setJars(SparkContext.jarOfClass(this.getClass).toSeq)

    val ssc = new StreamingContext(conf, Seconds(batchInterval.toInt))

    val substream = ssc.socketTextStream(hostname, port.toInt)
      .filter(!_.contains("NaN"))
      .map(_.split(" "))
      .filter(f => f(1) != "0")

    val orientationStream = substream
      .map(f => Seq(1, 4, 5, 6, 10, 11, 12, 20, 21, 22, 26, 27, 28, 36, 37, 38, 42, 43, 44).map(i => f(i)).toArray)
      .map(arr => arr.map(_.toDouble))
      .filter(f => f(0) == 1.0 || f(0) == 2.0 || f(0) == 3.0)
      .map(f => LabeledPoint(f(0), Vectors.dense(f.slice(1, f.length))))
    val test = orientationStream.transform(rdd => rdd.randomSplit(Array(0.3, 0.7))(0))
    val train = orientationStream.transformWith(test, (r1: RDD[LabeledPoint], r2: RDD[LabeledPoint]) => r1.subtract(r2)).cache()
    val model = new StreamingKMeans()
      .setK(3)
      .setDecayFactor(0)
      .setRandomCenters(18, 0.0)

    model.trainOn(train.map(v => v.features))
    val prediction = model.predictOnValues(test.map(v => (v.label, v.features)))

    ssc.start()
    ssc.awaitTermination()
  }

} 
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:54,代码来源:L9-10KMeans.scala


示例3: RedditVariationApp

//设置package包名称以及导入依赖的类
package org.apress.prospark

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{ Milliseconds, Seconds, StreamingContext }
import org.apache.hadoop.io.{ Text, LongWritable, IntWritable }
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.streaming.dstream.DStream
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{ TextOutputFormat => NewTextOutputFormat }
import org.apache.spark.streaming.dstream.PairDStreamFunctions
import org.apache.log4j.LogManager
import org.json4s._
import org.json4s.native.JsonMethods._
import java.text.SimpleDateFormat
import java.util.Date

object RedditVariationApp {
  def main(args: Array[String]) {
    if (args.length != 2) {
      System.err.println(
        "Usage: RedditVariationApp <appname> <input_path>")
      System.exit(1)
    }
    val Seq(appName, inputPath) = args.toSeq
    val LOG = LogManager.getLogger(this.getClass)

    val conf = new SparkConf()
      .setAppName(appName)
      .setJars(SparkContext.jarOfClass(this.getClass).toSeq)

    val ssc = new StreamingContext(conf, Seconds(1))
    LOG.info("Started at %d".format(ssc.sparkContext.startTime))

    val comments = ssc.fileStream[LongWritable, Text, TextInputFormat](inputPath, (f: Path) => true, newFilesOnly = false).map(pair => pair._2.toString)

    val merged = comments.union(comments)

    val repartitionedComments = comments.repartition(4)

    val rddMin = comments.glom().map(arr =>
      arr.minBy(rec => ((parse(rec) \ "created_utc").values.toString.toInt)))

    ssc.start()
    ssc.awaitTermination()

  }
} 
开发者ID:ZubairNabi,项目名称:prosparkstreaming,代码行数:50,代码来源:L3-DStreamVariation.scala


示例4: Consumer

//设置package包名称以及导入依赖的类
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.mllib.classification.SVMModel
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.SparkSession

object Consumer {

  def main(args: Array[String]): Unit = {

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("streaming")

    val sparkConf = new SparkConf().setMaster("local[8]").setAppName("KafkaTest")
    val streamingContext = new StreamingContext(sparkConf, Seconds(1))
    // Create a input direct stream
    val kafkaStream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    val sc = SparkSession.builder().master("local[8]").appName("KafkaTest").getOrCreate()
    val model = SVMModel.load(sc.sparkContext, "/home/xiaoyu/model")
    val result = kafkaStream.map(record => (record.key, record.value))
    result.foreachRDD(
      patient => {
        patient.collect().toBuffer.foreach(
          (x: (Any, String)) => {
            val features = x._2.split(',').map(x => x.toDouble).tail
            println(model.predict(Vectors.dense(features)))

          }
        )
      }
    )

    streamingContext.start()
    streamingContext.awaitTermination()

  }
} 
开发者ID:XiaoyuGuo,项目名称:DataFusionClass,代码行数:55,代码来源:Consumer.scala


示例5: User

//设置package包名称以及导入依赖的类
package services.users

import java.io.File
import java.nio.charset.Charset
import java.sql.Timestamp

import org.apache.commons.io.FileUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.streaming.StreamingContext
import services.events.EventStream
import services.Util


case class User(userId: String, testFinishTime: Timestamp, nickname: String, gender: String)

object User {
  val DELIMITER = ','
  val USER_FEED = "/Users/mahesh/data/affinitas/feeds/users/"
  val USER_DATA = "/Users/mahesh/data/affinitas/tables/users/"

  var ssc: StreamingContext = null
  var sql: SparkSession = null


  lazy val usersFeedDF = sql.read
    .format("com.databricks.spark.csv")
    .option("header", false)
    .schema(StructType(Array(
      StructField("userId", StringType, true),
      StructField("nickname", StringType, true),
      StructField("gender", StringType, true)
    )
    )).load(User.USER_FEED)

  //EventStream.testFinishStream.print()
  lazy val usersMap = usersFeedDF.rdd.map(record => (record.getString(0), (record.getString(1), record.getString(2))))


  def initialize(sscO: StreamingContext, sqlO: SparkSession) = {
    ssc = sscO
    sql = sqlO

    new File(USER_FEED).mkdirs()
    new File(USER_DATA).mkdirs()

    EventStream.testFinishStream.foreachRDD( {
      rdd => {
        val testFinishMap = rdd.map(record => (record.userId, record.timestamp))
        val userData = testFinishMap.join(usersMap)
          .map(record => Array(record._1, record._2._1, record._2._2._1, record._2._2._2))
          .collect()
        Util.writeCsvToDir(userData, DELIMITER.toString, USER_DATA)
      }
    })
  }
} 
开发者ID:f13mash,项目名称:spark_log_contact,代码行数:58,代码来源:User.scala


示例6: VeChallengeIngest

//设置package包名称以及导入依赖的类
package io.github.adrianulbona.ve

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import twitter4j.{GeoLocation, Place, Status}


object VeChallengeIngest {

  case class Location(latitude: Double, longitude: Double)

  case class Tweet(time: Long, text: String, user: String, isRetweet: Boolean, country: String, location: Location)

  def main(args: Array[String]) {

    val spark = SparkSession.builder
      .master("local[*]")
      .appName("ve-challenge")
      .getOrCreate()

    import spark.sqlContext.implicits._

    val ssc = new StreamingContext(spark.sparkContext, Minutes(2))
    val stream = TwitterUtils.createStream(ssc, None, Seq("challenge"))

    stream.map(extract).map(normalize).foreachRDD((batch, time) => {
      val batchDF: DataFrame = batch.toDF.cache
      batchDF.groupBy($"country").count().toDF("country", "count").orderBy($"count".desc).show(6)
      batchDF.coalesce(1).write.parquet("tweets/batch=" + time.milliseconds)
      batchDF.unpersist()
    })

    ssc.start()
    ssc.awaitTermination()

    spark.stop()
  }

  def extract(status: Status): (Long, String, String, Boolean, Option[Place], Option[GeoLocation]) = {
    (status.getCreatedAt.getTime,
      status.getText,
      status.getUser.getName,
      status.isRetweet,
      Option(status.getPlace),
      Option(status.getGeoLocation))
  }

  def normalize(extract: (Long, String, String, Boolean, Option[Place], Option[GeoLocation])): Tweet = extract match {
    case (time: Long, text: String, user: String, isRetweet: Boolean, Some(place: Place), Some(geoLoc: GeoLocation)) =>
      Tweet(time, text, user, isRetweet, place.getCountryCode, Location(geoLoc.getLatitude, geoLoc.getLongitude))
    case (time: Long, text: String, user: String, isRetweet: Boolean, Some(place: Place), None) =>
      Tweet(time, text, user, isRetweet, place.getCountryCode, Location(Double.NaN, Double.NaN))
    case (time: Long, text: String, user: String, isRetweet: Boolean, None, Some(geoLoc: GeoLocation)) =>
      Tweet(time, text, user, isRetweet, "unknown", Location(geoLoc.getLatitude, geoLoc.getLongitude))
    case (time: Long, text: String, user: String, isRetweet: Boolean, None, None) =>
      Tweet(time, text, user, isRetweet, "unknown", Location(Double.NaN, Double.NaN))
  }
} 
开发者ID:adrianulbona,项目名称:ve-challenge,代码行数:60,代码来源:VeChallengeIngest.scala


示例7: KafkaPayload

//设置package包名称以及导入依赖的类
package tools

import kafka.serializer.DefaultDecoder
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils

case class KafkaPayload(value: Array[Byte])

class KafkaDStreamSource(config: Map[String, String]) {

  def createSource(ssc: StreamingContext, topic: String): DStream[KafkaPayload] = {
    val kafkaParams = config
    val kafkaTopics = Set(topic)

    KafkaUtils.
      createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
      ssc,
      kafkaParams,
      kafkaTopics).
      map(dStream => KafkaPayload(dStream._2))
  }

}

object KafkaDStreamSource {
  def apply(config: Map[String, String]): KafkaDStreamSource = new KafkaDStreamSource(config)
} 
开发者ID:Antwnis,项目名称:kafka-streaming-examples,代码行数:29,代码来源:KafkaDStreamSource.scala


示例8: Collector

//设置package包名称以及导入依赖的类
package com.databricks.apps.twitterClassifier

import com.google.gson.Gson
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.twitter.TwitterUtils


object Collector {
  def doIt(options: CollectOptions, sc: SparkContext, ssc: StreamingContext) {
    val tweetStream: DStream[String] = TwitterUtils.createStream(ssc, maybeTwitterAuth)
      .map(new Gson().toJson(_))

    var numTweetsCollected = 0L
    tweetStream.foreachRDD { (rdd, time) =>
      val count = rdd.count
      if (count > 0) {
        rdd.saveAsTextFile(options.tweetDirectory.getAbsolutePath)
        numTweetsCollected += count
        if (numTweetsCollected > options.numTweetsToCollect) System.exit(0)
      }
    }

    ssc.start()
    ssc.awaitTermination()
  }
} 
开发者ID:krish121,项目名称:Spark-reference-applications,代码行数:29,代码来源:Collect.scala


示例9: Predict

//设置package包名称以及导入依赖的类
package com.databricks.apps.twitterClassifier

import org.apache.spark.SparkContext
import org.apache.spark.mllib.clustering.KMeansModel
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Predict extends App {
  import SparkSetup._

  val options = PredictOptions.parse(args)
  val ssc = new StreamingContext(sc, Seconds(options.intervalInSecs))
  Predictor.doIt(options, sc, ssc)
}


object Predictor {
  def doIt(options: PredictOptions, sc: SparkContext, ssc: StreamingContext) {
    println("Initializing the the KMeans model...")
    val model: KMeansModel = new KMeansModel(sc.objectFile[Vector](options.modelDirectory.getCanonicalPath).collect)

    println("Materializing Twitter stream...")
    TwitterUtils.createStream(ssc, maybeTwitterAuth)
      .map(_.getText)
      .foreachRDD { rdd =>
        rdd.filter(t => model.predict(featurize(t)) == options.clusterNumber)
           .foreach(print)  // register DStream as an output stream and materialize it
      }
    println("Initialization complete, starting streaming computation.")
    ssc.start()
    ssc.awaitTermination()
  }
} 
开发者ID:krish121,项目名称:Spark-reference-applications,代码行数:35,代码来源:Predict.scala


示例10: StreamingWordCount

//设置package包名称以及导入依赖的类
package org.examples.scala.examples

import org.apache.spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
import org.apache.spark._
import org.apache.spark.SparkContext._


object StreamingWordCount {
  def run(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage BasicStreamingExample <master> <output>")
    }
    val Array(master, output) = args.take(2)

    val conf = new SparkConf().setMaster(master).setAppName("BasicStreamingExample")
    val ssc = new StreamingContext(conf, Seconds(30))

    val lines = ssc.socketTextStream("localhost" , 7777)
    val words = lines.flatMap(_.split(" "))
    val wc = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)

    wc.saveAsTextFiles(output)
    wc.print

    println("pandas: sscstart")
    ssc.start()
    println("pandas: awaittermination")
    ssc.awaitTermination()
    println("pandas: done!")
  }
} 
开发者ID:jjmleiro,项目名称:learning-spark,代码行数:33,代码来源:StreamingWordCount.scala


示例11: KafkaStreaming

//设置package包名称以及导入依赖的类
package org.myorganization.spark.streaming


import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder



object KafkaStreaming {
    def main(args: Array[String]): Unit = {
        val (batchDuration, topics, bootstrapServers) = getParams(args)

        val conf = new SparkConf().setAppName("gpKafkaStreaming")
        val sc   = new SparkContext(conf)
        val ssc  = new StreamingContext(sc, Seconds(batchDuration))

        val topicsSet   = topics.split(",").toSet
        val kafkaParams = Map[String, String]("bootstrap.servers" -> bootstrapServers, "auto.offset.reset" -> "smallest")
        val messages    = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

        val data                 = messages.map(_._2)
        val loggerSerializerLogs = data.map(_.split("""\s+"""))
                                       .filter(x => x.length > 6)
                                       .map(x => (x(0), x(6)))
                                       .filter(filterLogLines)
                                       .map(x => x._1)
        val logCounts            = loggerSerializerLogs.map(x => (x, 1L)).reduceByKey(_ + _)
        logCounts.print(10)

        ssc.start()
        ssc.awaitTermination()
    }


    def filterLogLines(line: Tuple2[String, String]): Boolean = {
        val pattern = """logger.+"""
        line._2.matches(pattern)
    }


    def getParams(args: Array[String]): Tuple3[Int, String, String] = {
        if (args.length !=3 ) {
            System.err.println(s"""
                |Usage: spark-kafka.sh <sampling-period> <topics> <bootstrap-servers>
                |  <sampling-period>    is the duration of each batch (in seconds)
                |  <topics>             is a list of one or more kafka topics to consume from
                |  <bootstrap-servers>  is a list of one or more Kafka bootstrap servers
                |
                """.stripMargin)
            System.exit(1)
        }
        Tuple3[Int, String, String](args(0).toInt, args(1), args(2))
    }
} 
开发者ID:gpapag,项目名称:spark-streaming-kafka,代码行数:57,代码来源:KafkaStreaming.scala


示例12: TestUpdateStateByKey

//设置package包名称以及导入依赖的类
package examples.streaming

import org.apache.spark.streaming.{StreamingContext, Duration}
import org.apache.spark.SparkConf


object TestUpdateStateByKey {
  val checkpointDir: String = "hdfs://localhost:9000/user/hduser/spark-chkpt"

  def main(args: Array[String]): Unit = {
    val ssc = StreamingContext.getOrCreate(checkpointDir, createFunc _)

    ssc.start()
    ssc.awaitTermination()
  }

  def updateFunc(values: Seq[Int], state: Option[Int]): Option[Int] = {
    Some(values.size + state.getOrElse(0))
  }

  def createFunc(): StreamingContext = {
    val ssc = new StreamingContext(new SparkConf().setAppName("TestUpdateStateByKeyJob"),
      Duration(2000))

    ssc.checkpoint(checkpointDir)

    ssc.socketTextStream("localhost", 9999)
      .flatMap(_.split(" "))
      .map((_, 1))
      .updateStateByKey(updateFunc _)
      .checkpoint(Duration(10000))
      .print()

    ssc
  }
} 
开发者ID:prithvirajbose,项目名称:spark-dev,代码行数:37,代码来源:TestUpdateStateByKey.scala


示例13: TwitterStreaming

//设置package包名称以及导入依赖的类
package local.riverside

import org.apache.lucene.analysis.ja.JapaneseAnalyzer
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
import org.apache.spark.SparkConf
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Durations, StreamingContext}

object TwitterStreaming {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Twitter Streaming")
    val ssc = new StreamingContext(conf, Durations.minutes(1L))

    val filter = if (args.isEmpty) Nil else args.toList
    val stream = TwitterUtils.createStream(ssc, None, filter)

    stream
      .flatMap { status =>
        val text = status.getText

        val analyzer = new JapaneseAnalyzer
        val tokenStream = analyzer.tokenStream("", text)
        val charAttr = tokenStream.addAttribute(classOf[CharTermAttribute])
     
        tokenStream.reset()

        try {
          Iterator.continually(tokenStream.incrementToken())
                  .takeWhile(identity)
                  .map(_ => charAttr.toString)
                  .toVector
        } finally {
          tokenStream.end()
          tokenStream.close()
        }
      }
      .map(word => (word, 1))
      .reduceByKey((a, b) => a + b)
      .saveAsTextFiles("output/tweet")

    ssc.start()
    ssc.awaitTermination()
  }
} 
开发者ID:ryumei,项目名称:twitter-spark-streaming-sample,代码行数:45,代码来源:TwitterStreaming.scala


示例14: VeChallengeRealTime

//设置package包名称以及导入依赖的类
package io.github.adrianulbona.ve

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Minutes, StreamingContext}
import twitter4j.Place


object VeChallengeRealTime {

  def main(args: Array[String]) {

    val spark = SparkSession.builder
      .master("local[*]")
      .appName("ve-challenge")
      .getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Minutes(2))
    val stream = TwitterUtils.createStream(ssc, None, Seq("challenge"))

    val places: DStream[Place] = stream.map(status => Option(status.getPlace))
      .filter(optionPlace => optionPlace.isDefined)
      .map(place => place.get)

    places.map(place => place.getCountryCode)
      .countByValue()
      .foreachRDD(batch => printStats(batch.sortBy({ case (_, count) => count }, ascending = false).take(5)))

    ssc.start()
    ssc.awaitTermination()

    spark.stop()
  }

  def printStats(top5Countries: Array[(String, Long)]) {
    println()
    println(new SimpleDateFormat("yyyy-MM-dd  HH:mm:ss").format(new Date()))
    top5Countries.foreach(println)
  }
} 
开发者ID:adrianulbona,项目名称:ve-challenge,代码行数:45,代码来源:VeChallengeRealTime.scala


示例15: Main

//设置package包名称以及导入依赖的类
import Fqueue.{FqueueReceiver, FqueueSender}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


object Main {
  private def sendData() = {
    val fqueuSender = new FqueueSender("localhost:18740", 4, 4000)
    fqueuSender.connect()
    while (true) {
      val ret = fqueuSender.enQueue("track_BOdao2015*", "123")
      Thread.sleep(1000)
    }

    fqueuSender.stop()
  }

  private def getData() = {
    val fqueueReceiver = new FqueueReceiver("localhost:18740", 4, 4000)
    fqueueReceiver.connect()
    val data = fqueueReceiver.deQueue("track_BOdao2015*")
    println(data.getOrElse("null"))
    fqueueReceiver.stop()
  }

  def main(args: Array[String]) {
    new Thread("fqueue sender") {
      override def run() { sendData() }
    }.start()
    val config = new SparkConf().setAppName("testfqueue").setMaster("local[2]")
    val ssc = new StreamingContext(config, Seconds(5))
    val lines = ssc.receiverStream(new FqueueStreamingReceiver("localhost:18740", 4, 4000))
    lines.print()
    ssc.start()
    ssc.awaitTermination()
  }
} 
开发者ID:TopSpoofer,项目名称:FqueueStreamingReceiver,代码行数:38,代码来源:Main.scala


示例16: SparkStreamKinesis

//设置package包名称以及导入依赖的类
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream.LATEST
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_2
import org.apache.spark.streaming.kinesis._
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}


object SparkStreamKinesis{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Spark Kinesis").setMaster("local[4]")
    val ssc = new StreamingContext(conf, Seconds(1))

    println("Spark Streaming")

    

    val kinesisStream = KinesisUtils.createStream(ssc, "sparrow-ci",
      "sparrow-ci",
      "kinesis.us-east-1.amazonaws.com",
      "us-east-1",
      LATEST,
      Duration(2000),
      MEMORY_AND_DISK_2)

    kinesisStream.print()

    kinesisStream.flatMap(new String(_))
      .foreachRDD(_.collect().foreach(print))

    ssc.start()
    ssc.awaitTermination()
  }
} 
开发者ID:noppanit,项目名称:spark-streaming-kinesis-example,代码行数:34,代码来源:main.scala


示例17: SimpleDataStream

//设置package包名称以及导入依赖的类
package com.fortysevendeg.log

import com.fortysevendeg.log.models._
import com.fortysevendeg.log.utils.Regex._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

import scala.language.postfixOps

object SimpleDataStream {

  def main(args: Array[String]) = {

    // run:
    // $ adb logcat -v time | nc -lk 9999

    // Spark configuration

    val conf = new SparkConf().setMaster("local[2]").setAppName("SimpleDataStream")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Milliseconds(1000))
    ssc.checkpoint("/tmp")

    val logLines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)

    val logs = logLines.flatMap { line =>
      for {
        typePlusAppPlusPid <- typePlusAppPlusPid.findFirstIn(line)
        data = extractTypeAppPid(typePlusAppPlusPid)
        logType = data._1
        app <- data._2
        pid <- data._3
        date <- date.findFirstIn(line)
        message <- message.findFirstIn(line)
      } yield {
        LogLine(LogInfo(app, pid, logType, date), message.substring(2))
      }
    }

    logs foreachRDD (_.foreach { log =>
      println(s"${log.info.logType}: ${log.info.app}: ${log.message}")
    })

    ssc.start()
    ssc.awaitTermination()
  }
} 
开发者ID:javipacheco,项目名称:spark-android-log,代码行数:49,代码来源:SimpleDataStream.scala


示例18: LogAnalyzerStreamingImportDirectory

//设置package包名称以及导入依赖的类
package com.databricks.apps.logs.chapter2

import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

import com.databricks.apps.logs.{ApacheAccessLog, LogAnalyzerRDD}


object LogAnalyzerStreamingImportDirectory extends App {
  val WINDOW_LENGTH = Seconds(30)
  val SLIDE_INTERVAL = Seconds(10)

  val spark = SparkSession
    .builder()
    .appName("Log Analyzer Import Streaming HDFS")
    .getOrCreate()
  val streamingContext = new StreamingContext(spark.sparkContext, SLIDE_INTERVAL)

  val directory = args(0)

  // This method monitors a directory for new files to read in for streaming.
  val logData: DStream[String] = streamingContext.textFileStream(directory)

  val accessLogsDStream: DStream[ApacheAccessLog] = logData.map(ApacheAccessLog.parseLogLine)
  val windowDStream: DStream[ApacheAccessLog] = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL)

  val logAnalyzerRDD = LogAnalyzerRDD(spark)
  windowDStream.foreachRDD(accessLogs => {
    if (accessLogs.count() == 0) {
      println("No access logs received in this time interval")
    } else {
      val logStatistics = logAnalyzerRDD.processRdd(accessLogs)
      logStatistics.printToStandardOut()
    }
  })

  // Start the streaming server.
  streamingContext.start() // Start the computation
  streamingContext.awaitTermination() // Wait for the computation to terminate
} 
开发者ID:krish121,项目名称:Spark-reference-applications,代码行数:42,代码来源:LogAnalyzerStreamingImportDirectory.scala


示例19: Windows

//设置package包名称以及导入依赖的类
package sw.streaming

import org.apache.spark._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds

object Windows extends App {

  val sparkConf = new SparkConf()
    .setAppName(this.getClass.getName)
    .setMaster("local[*]")
  val sc = new SparkContext(sparkConf)

  val interval = 10
  val ssc = new StreamingContext(sc, Seconds(interval))
  ssc.checkpoint("checkpoint")

  val lines = ssc.socketTextStream("localhost", 9999)
  val windowed = lines.window(Seconds(interval * 3))

  windowed.flatMap(_.split("""\W+"""))
    .map(w => (w, 1))
    .reduceByKey(_ + _)
    .print()

  ssc.start()

  ssc.awaitTermination()
} 
开发者ID:rabbitonweb,项目名称:spark-workshop,代码行数:33,代码来源:Windows.scala


示例20: ReadingFromDirectory

//设置package包名称以及导入依赖的类
package sw.streaming

import org.apache.spark._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds

object ReadingFromDirectory extends App {

  val sparkConf = new SparkConf()
    .setAppName(this.getClass.getName)
    .setMaster("local[*]")
  val sc = new SparkContext(sparkConf)

  val ssc = new StreamingContext(sc, Seconds(10))
  val logs = ssc.textFileStream("src/main/resources/logs/")

  logs.print()

  ssc.start()

  ssc.awaitTermination()
} 
开发者ID:rabbitonweb,项目名称:spark-workshop,代码行数:26,代码来源:ReadingFromDirectory.scala



注:本文中的org.apache.spark.streaming.StreamingContext类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Connection类代码示例发布时间:2022-05-23
下一篇:
Scala Configuration类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap