• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala DStream类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.spark.streaming.dstream.DStream的典型用法代码示例。如果您正苦于以下问题:Scala DStream类的具体用法?Scala DStream怎么用?Scala DStream使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了DStream类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: KafkaPayload

//设置package包名称以及导入依赖的类
package tools

import kafka.serializer.DefaultDecoder
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils

case class KafkaPayload(value: Array[Byte])

class KafkaDStreamSource(config: Map[String, String]) {

  def createSource(ssc: StreamingContext, topic: String): DStream[KafkaPayload] = {
    val kafkaParams = config
    val kafkaTopics = Set(topic)

    KafkaUtils.
      createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
      ssc,
      kafkaParams,
      kafkaTopics).
      map(dStream => KafkaPayload(dStream._2))
  }

}

object KafkaDStreamSource {
  def apply(config: Map[String, String]): KafkaDStreamSource = new KafkaDStreamSource(config)
} 
开发者ID:Antwnis,项目名称:kafka-streaming-examples,代码行数:29,代码来源:KafkaDStreamSource.scala


示例2: Collector

//设置package包名称以及导入依赖的类
package com.databricks.apps.twitterClassifier

import com.google.gson.Gson
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.twitter.TwitterUtils


object Collector {
  def doIt(options: CollectOptions, sc: SparkContext, ssc: StreamingContext) {
    val tweetStream: DStream[String] = TwitterUtils.createStream(ssc, maybeTwitterAuth)
      .map(new Gson().toJson(_))

    var numTweetsCollected = 0L
    tweetStream.foreachRDD { (rdd, time) =>
      val count = rdd.count
      if (count > 0) {
        rdd.saveAsTextFile(options.tweetDirectory.getAbsolutePath)
        numTweetsCollected += count
        if (numTweetsCollected > options.numTweetsToCollect) System.exit(0)
      }
    }

    ssc.start()
    ssc.awaitTermination()
  }
} 
开发者ID:krish121,项目名称:Spark-reference-applications,代码行数:29,代码来源:Collect.scala


示例3: Combinators

//设置package包名称以及导入依赖的类
package com.stratio.ioft.streaming.transformations

import com.stratio.ioft.domain._
import com.stratio.ioft.domain.measures.{Acceleration, Attitude}
import com.stratio.ioft.domain.states.AttitudeHistory
import com.stratio.ioft.util.Math.Geometry._
import org.apache.spark.streaming.dstream.DStream
import com.stratio.ioft.domain.measures.VectorMeasure._
import com.stratio.ioft.streaming.transformations.Aggregators.attitudeHistoryStream

import org.apache.spark.streaming.Milliseconds

object Combinators {

  
  def desiredAndActualAttitudeStream(
                                      desiredAttitudeStream: DStream[(DroneIdType, (BigInt, Attitude))],
                                      attitudeStream: DStream[(DroneIdType, (BigInt, Attitude))],
                                      timeRange: Long
                                    ): DStream[(DroneIdType, (BigInt, Attitude, Attitude))] = {

    def windowed(stream: DStream[(DroneIdType, (BigInt, Attitude))]) = {
      val windowDuration = Milliseconds(timeRange)
      stream.window(windowDuration, windowDuration)
    }

    windowed(desiredAttitudeStream) join attitudeHistoryStream(windowed(attitudeStream)) flatMap {
      case (id, ( (ts, desired), actualAttitudeHistory)) =>
        val closestAttitudes = actualAttitudeHistory.attitudeAt(ts)
        closestAttitudes.headOption map { _ =>
          val (_, actualAttitude: Attitude) = closestAttitudes.minBy {
            case (actual_ts, _) => math.abs((actual_ts-ts).toLong)
          }
          id -> (ts, desired, actualAttitude)
        }
    }

  }

} 
开发者ID:pfcoperez,项目名称:sparkstream_ioft,代码行数:41,代码来源:Combinators.scala


示例4: VeChallengeRealTime

//设置package包名称以及导入依赖的类
package io.github.adrianulbona.ve

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Minutes, StreamingContext}
import twitter4j.Place


object VeChallengeRealTime {

  def main(args: Array[String]) {

    val spark = SparkSession.builder
      .master("local[*]")
      .appName("ve-challenge")
      .getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Minutes(2))
    val stream = TwitterUtils.createStream(ssc, None, Seq("challenge"))

    val places: DStream[Place] = stream.map(status => Option(status.getPlace))
      .filter(optionPlace => optionPlace.isDefined)
      .map(place => place.get)

    places.map(place => place.getCountryCode)
      .countByValue()
      .foreachRDD(batch => printStats(batch.sortBy({ case (_, count) => count }, ascending = false).take(5)))

    ssc.start()
    ssc.awaitTermination()

    spark.stop()
  }

  def printStats(top5Countries: Array[(String, Long)]) {
    println()
    println(new SimpleDateFormat("yyyy-MM-dd  HH:mm:ss").format(new Date()))
    top5Countries.foreach(println)
  }
} 
开发者ID:adrianulbona,项目名称:ve-challenge,代码行数:45,代码来源:VeChallengeRealTime.scala


示例5: LogAnalyzerWindowed

//设置package包名称以及导入依赖的类
package com.databricks.apps.logs

import scala.math.Ordering

import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.dstream.DStream


class LogAnalyzerWindowed(val windowLength: Long, val slideInterval: Long) extends AnalyzeFunctions with Serializable {

  import LogStatistics.EMPTY_LOG_STATISTICS

  var logStatistics = EMPTY_LOG_STATISTICS

  def processAccessLogs(accessLogsDStream: DStream[ApacheAccessLog]): Unit = {
    val windowDStream: DStream[ApacheAccessLog] = accessLogsDStream
      .window(Seconds(windowLength), Seconds(slideInterval))
    windowDStream.foreachRDD(accessLogs => {
      if (accessLogs.count() == 0) {
        logStatistics = EMPTY_LOG_STATISTICS
      } else {
        logStatistics = LogStatistics(contentSizeStats(accessLogs).get,
          responseCodeCount(accessLogs).take(100).toMap,
          filterIPAddress(ipAddressCount(accessLogs)).take(100),
          endpointCount(accessLogs).top(10)(Ordering.by[(String, Long), Long](_._2)).toMap)
      }
    })
  }

  def getLogStatistics: LogStatistics = logStatistics
} 
开发者ID:krish121,项目名称:Spark-reference-applications,代码行数:32,代码来源:LogAnalyzerWindowed.scala


示例6: LogAnalyzerStreamingImportDirectory

//设置package包名称以及导入依赖的类
package com.databricks.apps.logs.chapter2

import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

import com.databricks.apps.logs.{ApacheAccessLog, LogAnalyzerRDD}


object LogAnalyzerStreamingImportDirectory extends App {
  val WINDOW_LENGTH = Seconds(30)
  val SLIDE_INTERVAL = Seconds(10)

  val spark = SparkSession
    .builder()
    .appName("Log Analyzer Import Streaming HDFS")
    .getOrCreate()
  val streamingContext = new StreamingContext(spark.sparkContext, SLIDE_INTERVAL)

  val directory = args(0)

  // This method monitors a directory for new files to read in for streaming.
  val logData: DStream[String] = streamingContext.textFileStream(directory)

  val accessLogsDStream: DStream[ApacheAccessLog] = logData.map(ApacheAccessLog.parseLogLine)
  val windowDStream: DStream[ApacheAccessLog] = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL)

  val logAnalyzerRDD = LogAnalyzerRDD(spark)
  windowDStream.foreachRDD(accessLogs => {
    if (accessLogs.count() == 0) {
      println("No access logs received in this time interval")
    } else {
      val logStatistics = logAnalyzerRDD.processRdd(accessLogs)
      logStatistics.printToStandardOut()
    }
  })

  // Start the streaming server.
  streamingContext.start() // Start the computation
  streamingContext.awaitTermination() // Wait for the computation to terminate
} 
开发者ID:krish121,项目名称:Spark-reference-applications,代码行数:42,代码来源:LogAnalyzerStreamingImportDirectory.scala


示例7: StreamActor

//设置package包名称以及导入依赖的类
package services

import akka.actor.Actor.Receive
import akka.actor.{Actor, ActorRef, Props}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import play.api.libs.json.Json
import services.StreamActor.TopHashtag
import twitter4j.Status


class StreamActor[T] (val out : ActorRef, val stream : DStream[T]) extends Actor {

  stream.foreachRDD { rdd =>
    rdd.take(5).foreach { element =>
      out ! element.toString
    }
  }

  override def receive: Receive = {
    case msg: String => out ! "Ok"
    case TopHashtag(top) => out ! Json.toJson(top)
  }

  @scala.throws[Exception](classOf[Exception])
  override def postStop(): Unit = {

  }
}

object StreamActor {
  def props[T](out: ActorRef, stream : DStream[T]) = Props(new StreamActor(out, stream))

  case class TopHashtag(top : Map[String, Int])
} 
开发者ID:OpenCompare,项目名称:live-pcm,代码行数:35,代码来源:StreamActor.scala


示例8: createStream

//设置package包名称以及导入依赖的类
package it.agilelab.bigdata.wasp.consumers.readers

import it.agilelab.bigdata.wasp.core.WaspSystem
import it.agilelab.bigdata.wasp.core.WaspSystem._
import it.agilelab.bigdata.wasp.core.kafka.CheckOrCreateTopic
import it.agilelab.bigdata.wasp.core.logging.WaspLogger
import it.agilelab.bigdata.wasp.core.models.{DefaultConfiguration, TopicModel}
import it.agilelab.bigdata.wasp.core.utils.{AvroToJsonUtil, ConfigManager, JsonToByteArrayUtil}
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils



  //TODO: check warning (not understood)
  def createStream(group: String, topic: TopicModel)(implicit ssc: StreamingContext): DStream[String] = {
    val kafkaConfig = ConfigManager.getKafkaConfig

    val kafkaConfigMap: Map[String, String] = Map(
      "zookeeper.connect" -> kafkaConfig.zookeeper.toString,
      "zookeeper.connection.timeout.ms" -> kafkaConfig.zookeeper.timeout.getOrElse(DefaultConfiguration.timeout).toString
    )


    if (??[Boolean](WaspSystem.getKafkaAdminActor, CheckOrCreateTopic(topic.name, topic.partitions, topic.replicas))) {
      val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](
        ssc,
        kafkaConfigMap + ("group.id" -> group),
        Map(topic.name -> 3),
        StorageLevel.MEMORY_AND_DISK_2
      )

      topic.topicDataType match {
        case "avro" => receiver.map(x => (x._1, AvroToJsonUtil.avroToJson(x._2))).map(_._2)
        case "json" => receiver.map(x => (x._1, JsonToByteArrayUtil.byteArrayToJson(x._2))).map(_._2)
        case _ => receiver.map(x => (x._1, AvroToJsonUtil.avroToJson(x._2))).map(_._2)
      }

    } else {
      logger.error(s"Topic not found on Kafka: $topic")
      throw new Exception(s"Topic not found on Kafka: $topic")
    }
  }
} 
开发者ID:agile-lab-dev,项目名称:wasp,代码行数:47,代码来源:KafkaReader.scala


示例9: TwitterUtils

//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.streaming

import java.util.Properties

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import twitter4j.auth.{AccessToken, Authorization}
import twitter4j.{FilterQuery, Status, TwitterFactory}

object TwitterUtils {

  
  def createMultiStream(ssc: StreamingContext,
                        queryBuilder: () => FilterQuery = () => null,
                        credentials: Seq[Authorization] = loadDefaultCredentials(),
                        storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER
                       ): DStream[Status] = {
    credentials
      .map(auth => createStream(ssc, Some(queryBuilder()), Some(auth)))
      .reduce { (accStream, stream) => accStream.union(stream) }
  }

  private def loadDefaultCredentials(): Seq[Authorization] = {
    val props = loadTwitterProperties()
    val num = props.getProperty("twitter.credentials").toInt
    1.to(num).map(i => {
      val twitter = new TwitterFactory().getInstance()

      twitter.setOAuthConsumer(
        props.getProperty(s"twitter.credentials.$i.consumerKey"),
        props.getProperty(s"twitter.credentials.$i.consumerSecret")
      )

      twitter.setOAuthAccessToken(new AccessToken(
        props.getProperty(s"twitter.credentials.$i.token"),
        props.getProperty(s"twitter.credentials.$i.tokenSecret")
      ))

      twitter.getAuthorization
    })
  }

  private def loadTwitterProperties(): Properties = {
    val properties = new Properties()
    val stream = getClass.getResourceAsStream("/com/aluxian/tweeather/res/twitter.properties")
    properties.load(stream)
    stream.close()
    properties
  }

} 
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:53,代码来源:TwitterUtils.scala


示例10: KafkaEngine

//设置package包名称以及导入依赖的类
package com.lljv.analytics.hadoopengine

import java.util.HashMap
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import scala.util.control.NonFatal


class KafkaEngine(val settings: KafkaSettings) extends Serializable {

  var producer: Option[KafkaProducer[String, String]] = None


  def getStreamingParameters(): Map[String, String] = {
    val parameters = Map[String, String](
      "metadata.broker.list" -> settings.kafkaBroker,
      "bootstrap.servers" -> settings.kafkaBroker,
      "key.serializer" -> settings.stringSerializerType,
      "value.serializer" -> settings.stringSerializerType,
      "key.deserializer" -> settings.stringDeserializerType,
      "value.deserializer" -> settings.stringDeserializerType,
      "group.id" -> settings.consumerGroupId
    )
    return parameters
  }

  def startStream(
                   topicName: String,
                   sparkEngine: SparkStreamEngine
                 ): Option[InputDStream[ConsumerRecord[String, String]]] =
  {
    val stream: Option[InputDStream[ConsumerRecord[String, String]]] = try {
      Some(KafkaUtils.createDirectStream[String,String](
        sparkEngine.streamingContext.get,
        PreferConsistent,
        Subscribe[String, String](Array(topicName), this.getStreamingParameters())
      ))
    } catch {
      case NonFatal(exc) => {
        // printf(exc.getMessage())
        // TODO: logging
        None
      }
    }
    return stream
  }

  

} 
开发者ID:dotdeb,项目名称:Science-Finder,代码行数:56,代码来源:KafkaEngine.scala


示例11: StreamingExample

//设置package包名称以及导入依赖的类
package com.pygmalios.reactiveinflux.spark.examples

import com.pygmalios.reactiveinflux._
import com.pygmalios.reactiveinflux.spark._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.joda.time.DateTime

import scala.concurrent.duration._


object StreamingExample extends App {
  val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("Example")
  val ssc = new StreamingContext(conf, Seconds(1))

  val point1 = Point(
    time        = DateTime.now(),
    measurement = "measurement1",
    tags        = Map(
      "tagKey1" -> "tagValue1",
      "tagKey2" -> "tagValue2"),
    fields      = Map(
      "fieldKey1" -> "fieldValue1",
      "fieldKey2" -> 10.7)
  )

  // Provide settings for reactiveinflux
  implicit val params = ReactiveInfluxDbName("example")
  implicit val awaitAtMost = 1.second

  // Create DStream of Influx points
  val queue = new scala.collection.mutable.Queue[RDD[Point]]
  val queueStream: DStream[Point] = ssc.queueStream(queue)

  // Add single RDD with a single Influx point to the DStream
  queue.enqueue(ssc.sparkContext.parallelize(Seq(point1)))

  // Save DStream to Influx
  queueStream.saveToInflux()

  // Start Spark streaming
  ssc.start()
  ssc.awaitTermination()
} 
开发者ID:pygmalios,项目名称:reactiveinflux-spark,代码行数:49,代码来源:StreamingExample.scala


示例12: PointDStreamExtensionsSpec

//设置package包名称以及导入依赖的类
package com.pygmalios.reactiveinflux.extensions

import com.holdenkarau.spark.testing.StreamingActionBase
import com.pygmalios.reactiveinflux.spark._
import com.pygmalios.reactiveinflux._
import org.apache.spark.streaming.dstream.DStream
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner

@RunWith(classOf[JUnitRunner])
class PointDStreamExtensionsSpec extends StreamingActionBase
  with BeforeAndAfterAll {
  import PointRDDExtensionsSpec._

  override def beforeAll: Unit = {
    super.beforeAll
    withInflux(_.create())
  }

  override def afterAll: Unit = {
    withInflux(_.drop())
    super.afterAll
  }

  test("write single point to Influx") {
    val points = List(point1)

    // Execute
    runAction(Seq(points), (dstream: DStream[Point]) => dstream.saveToInflux())

    // Assert
    val result = withInflux(
      _.query(Query(s"SELECT * FROM $measurement1")).result.singleSeries)

    assert(result.rows.size == 1)

    val row = result.rows.head
    assert(row.time == point1.time)
    assert(row.values.size == 5)
  }
} 
开发者ID:pygmalios,项目名称:reactiveinflux-spark,代码行数:43,代码来源:PointDStreamExtensionsSpec.scala


示例13: CommandStreamProcessor

//设置package包名称以及导入依赖的类
package com.crystal
package processors

// Spark
import org.apache.spark.streaming.kinesis._
import org.apache.spark.streaming.{ Duration, StreamingContext }
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.storage.StorageLevel
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

// JSON Parsing
import scala.util.parsing.json.JSON

object CommandStreamProcessor {
  def setup(appConfig: AppConfig, streamingCtx: StreamingContext) = {
    val cmdStream = getCommandStream(appConfig, streamingCtx)

    cmdStream.foreachRDD { rdd =>
      rdd.foreach{ cmd =>
        println("--- Command Received ---")
      }
    }
  }

  private def getCommandStream(
    appConfig: AppConfig,
    streamingCtx: StreamingContext): DStream[Map[String, Any]] = {
    val stream = KinesisUtils.createStream(
      streamingCtx,
      appConfig.commandAppName,
      appConfig.commandStreamName,
      s"kinesis.${appConfig.regionName}.amazonaws.com",
      appConfig.regionName,
      InitialPositionInStream.LATEST,
      Duration(appConfig.checkpointInterval),
      StorageLevel.MEMORY_AND_DISK_2
    )

    stream
      .map { byteArray => new String(byteArray) }
      .map { jsonStr => JSON.parseFull(jsonStr).get.asInstanceOf[Map[String, Any]] }
  }

} 
开发者ID:crystal-project-inc,项目名称:streaming_user_segmentation,代码行数:45,代码来源:CommandStreamProcessor.scala


示例14: RedditStreamFactory

//设置package包名称以及导入依赖的类
package com.microsoft.partnercatalyst.fortis.spark.sources.streamfactories

import com.github.catalystcode.fortis.spark.streaming.reddit.dto.RedditObject
import com.github.catalystcode.fortis.spark.streaming.reddit.{RedditAuth, RedditUtils}
import com.microsoft.partnercatalyst.fortis.spark.sources.streamprovider.{ConnectorConfig, StreamFactory}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream

class RedditStreamFactory extends StreamFactoryBase[RedditObject] {
  override protected def canHandle(connectorConfig: ConnectorConfig): Boolean = {
    connectorConfig.name == "RedditObject"
  }

  override protected def buildStream(ssc: StreamingContext, connectorConfig: ConnectorConfig): DStream[RedditObject] = {
    import ParameterExtensions._

    val params = connectorConfig.parameters
    val auth = RedditAuth(params.getAs[String]("applicationId"), params.getAs[String]("applicationSecret"))
    val keywords = params.getAs[String]("keywords").split('|')
    val subreddit = params.get("subreddit").asInstanceOf[Option[String]]
    val searchLimit = params.getOrElse("searchLimit", "25").asInstanceOf[String].toInt
    val searchResultType = Some(params.getOrElse("searchResultType", "link").asInstanceOf[String])

    RedditUtils.createPageStream(
      auth,
      keywords.toSeq,
      ssc,
      subredit = subreddit,
      searchLimit = searchLimit,
      searchResultType = searchResultType
    )
  }
} 
开发者ID:CatalystCode,项目名称:project-fortis-spark,代码行数:34,代码来源:RedditStreamFactory.scala


示例15: RadioStreamFactory

//设置package包名称以及导入依赖的类
package com.microsoft.partnercatalyst.fortis.spark.sources.streamfactories

import com.microsoft.partnercatalyst.fortis.spark.sources.streamprovider.{ConnectorConfig, StreamFactory}
import com.microsoft.partnercatalyst.fortis.spark.sources.streamwrappers.radio.{RadioStreamUtils, RadioTranscription}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream

class RadioStreamFactory extends StreamFactoryBase[RadioTranscription]{
  override protected def canHandle(connectorConfig: ConnectorConfig): Boolean = {
    connectorConfig.name == "Radio"
  }

  override protected def buildStream(ssc: StreamingContext, connectorConfig: ConnectorConfig): DStream[RadioTranscription] = {
    import ParameterExtensions._

    val params = connectorConfig.parameters

    RadioStreamUtils.createStream(ssc,
      params.getAs[String]("radioUrl"),
      params.getAs[String]("audioType"),
      params.getAs[String]("locale"),
      params.getAs[String]("subscriptionKey"),
      params.getAs[String]("speechType"),
      params.getAs[String]("outputFormat")
    )
  }
} 
开发者ID:CatalystCode,项目名称:project-fortis-spark,代码行数:28,代码来源:RadioStreamFactory.scala


示例16: BingPageStreamFactory

//设置package包名称以及导入依赖的类
package com.microsoft.partnercatalyst.fortis.spark.sources.streamfactories

import com.github.catalystcode.fortis.spark.streaming.bing.dto.BingPost
import com.github.catalystcode.fortis.spark.streaming.bing.{BingAuth, BingUtils}
import com.microsoft.partnercatalyst.fortis.spark.sources.streamprovider.{ConnectorConfig, StreamFactory}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream

class BingPageStreamFactory extends StreamFactoryBase[BingPost]{
  override protected def canHandle(connectorConfig: ConnectorConfig): Boolean = {
    connectorConfig.name == "BingPage"
  }

  override protected def buildStream(ssc: StreamingContext, connectorConfig: ConnectorConfig): DStream[BingPost] = {
    import ParameterExtensions._

    val params = connectorConfig.parameters
    val auth = BingAuth(params.getAs[String]("accessToken"))
    val searchInstanceId = params.getAs[String]("searchInstanceId")
    val keywords = params.getAs[String]("keywords").split('|')

    BingUtils.createPageStream(ssc, auth, searchInstanceId, keywords)
  }
} 
开发者ID:CatalystCode,项目名称:project-fortis-spark,代码行数:25,代码来源:BingPageStreamFactory.scala


示例17: InstagramTagStreamFactory

//设置package包名称以及导入依赖的类
package com.microsoft.partnercatalyst.fortis.spark.sources.streamfactories

import com.github.catalystcode.fortis.spark.streaming.instagram.dto.InstagramItem
import com.github.catalystcode.fortis.spark.streaming.instagram.{InstagramAuth, InstagramUtils}
import com.microsoft.partnercatalyst.fortis.spark.sources.streamprovider.{ConnectorConfig, StreamFactory}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream

class InstagramTagStreamFactory extends StreamFactoryBase[InstagramItem]{
  override protected def canHandle(connectorConfig: ConnectorConfig): Boolean = {
    connectorConfig.name == "InstagramTag"
  }

  override protected def buildStream(ssc: StreamingContext, connectorConfig: ConnectorConfig): DStream[InstagramItem] = {
    import ParameterExtensions._

    val params = connectorConfig.parameters
    val auth = InstagramAuth(params.getAs[String]("authToken"))

    InstagramUtils.createTagStream(ssc, auth, params.getAs[String]("tag"))
  }
} 
开发者ID:CatalystCode,项目名称:project-fortis-spark,代码行数:23,代码来源:InstagramTagStreamFactory.scala


示例18: FacebookPageStreamFactory

//设置package包名称以及导入依赖的类
package com.microsoft.partnercatalyst.fortis.spark.sources.streamfactories

import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookPost
import com.github.catalystcode.fortis.spark.streaming.facebook.{FacebookAuth, FacebookUtils}
import com.microsoft.partnercatalyst.fortis.spark.sources.streamprovider.{ConnectorConfig, StreamFactory}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream

class FacebookPageStreamFactory extends StreamFactoryBase[FacebookPost] {
  override protected def canHandle(connectorConfig: ConnectorConfig): Boolean = {
    connectorConfig.name == "FacebookPage"
  }

  override protected def buildStream(ssc: StreamingContext, connectorConfig: ConnectorConfig): DStream[FacebookPost] = {
    import ParameterExtensions._

    val params = connectorConfig.parameters
    val facebookAuth = FacebookAuth(
      params.getAs[String]("appId"),
      params.getAs[String]("appSecret"),
      params.getAs[String]("accessToken")
    )

    FacebookUtils.createPageStreams(ssc, facebookAuth, params.getTrustedSources.toSet)
  }
} 
开发者ID:CatalystCode,项目名称:project-fortis-spark,代码行数:27,代码来源:FacebookPageStreamFactory.scala


示例19: InstagramLocationStreamFactory

//设置package包名称以及导入依赖的类
package com.microsoft.partnercatalyst.fortis.spark.sources.streamfactories

import com.github.catalystcode.fortis.spark.streaming.instagram.dto.InstagramItem
import com.github.catalystcode.fortis.spark.streaming.instagram.{InstagramAuth, InstagramUtils}
import com.microsoft.partnercatalyst.fortis.spark.sources.streamprovider.{ConnectorConfig, StreamFactory}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream

class InstagramLocationStreamFactory extends StreamFactoryBase[InstagramItem]{
  override protected def canHandle(connectorConfig: ConnectorConfig): Boolean = {
    connectorConfig.name == "InstagramLocation"
  }

  override protected def buildStream(ssc: StreamingContext, connectorConfig: ConnectorConfig): DStream[InstagramItem] = {
    import ParameterExtensions._

    val params = connectorConfig.parameters
    val auth = InstagramAuth(params.getAs[String]("authToken"))

    InstagramUtils.createLocationStream(
      ssc,
      auth,
      latitude = params.getAs[String]("latitude").toDouble,
      longitude = params.getAs[String]("longitude").toDouble)
  }
} 
开发者ID:CatalystCode,项目名称:project-fortis-spark,代码行数:27,代码来源:InstagramLocationStreamFactory.scala


示例20: FacebookCommentStreamFactory

//设置package包名称以及导入依赖的类
package com.microsoft.partnercatalyst.fortis.spark.sources.streamfactories

import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookComment
import com.github.catalystcode.fortis.spark.streaming.facebook.{FacebookAuth, FacebookUtils}
import com.microsoft.partnercatalyst.fortis.spark.sources.streamprovider.{ConnectorConfig, StreamFactory}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream

class FacebookCommentStreamFactory extends StreamFactoryBase[FacebookComment] {
  override protected def canHandle(connectorConfig: ConnectorConfig): Boolean = {
    connectorConfig.name == "FacebookComment"
  }

  override protected def buildStream(ssc: StreamingContext, connectorConfig: ConnectorConfig): DStream[FacebookComment] = {
    import ParameterExtensions._

    val params = connectorConfig.parameters
    val facebookAuth = FacebookAuth(
      params.getAs[String]("appId"),
      params.getAs[String]("appSecret"),
      params.getAs[String]("accessToken")
    )

    FacebookUtils.createCommentsStreams(ssc, facebookAuth, params.getTrustedSources.toSet)
  }
} 
开发者ID:CatalystCode,项目名称:project-fortis-spark,代码行数:27,代码来源:FacebookCommentsStreamFactory.scala



注:本文中的org.apache.spark.streaming.dstream.DStream类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala SprayJsonSupport类代码示例发布时间:2022-05-23
下一篇:
Scala URLClassLoader类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap