本文整理汇总了Scala中org.apache.spark.streaming.dstream.DStream类的典型用法代码示例。如果您正苦于以下问题:Scala DStream类的具体用法?Scala DStream怎么用?Scala DStream使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了DStream类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: KafkaPayload
//设置package包名称以及导入依赖的类
package tools
import kafka.serializer.DefaultDecoder
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
case class KafkaPayload(value: Array[Byte])
class KafkaDStreamSource(config: Map[String, String]) {
def createSource(ssc: StreamingContext, topic: String): DStream[KafkaPayload] = {
val kafkaParams = config
val kafkaTopics = Set(topic)
KafkaUtils.
createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
ssc,
kafkaParams,
kafkaTopics).
map(dStream => KafkaPayload(dStream._2))
}
}
object KafkaDStreamSource {
def apply(config: Map[String, String]): KafkaDStreamSource = new KafkaDStreamSource(config)
}
开发者ID:Antwnis,项目名称:kafka-streaming-examples,代码行数:29,代码来源:KafkaDStreamSource.scala
示例2: Collector
//设置package包名称以及导入依赖的类
package com.databricks.apps.twitterClassifier
import com.google.gson.Gson
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.twitter.TwitterUtils
object Collector {
def doIt(options: CollectOptions, sc: SparkContext, ssc: StreamingContext) {
val tweetStream: DStream[String] = TwitterUtils.createStream(ssc, maybeTwitterAuth)
.map(new Gson().toJson(_))
var numTweetsCollected = 0L
tweetStream.foreachRDD { (rdd, time) =>
val count = rdd.count
if (count > 0) {
rdd.saveAsTextFile(options.tweetDirectory.getAbsolutePath)
numTweetsCollected += count
if (numTweetsCollected > options.numTweetsToCollect) System.exit(0)
}
}
ssc.start()
ssc.awaitTermination()
}
}
开发者ID:krish121,项目名称:Spark-reference-applications,代码行数:29,代码来源:Collect.scala
示例3: Combinators
//设置package包名称以及导入依赖的类
package com.stratio.ioft.streaming.transformations
import com.stratio.ioft.domain._
import com.stratio.ioft.domain.measures.{Acceleration, Attitude}
import com.stratio.ioft.domain.states.AttitudeHistory
import com.stratio.ioft.util.Math.Geometry._
import org.apache.spark.streaming.dstream.DStream
import com.stratio.ioft.domain.measures.VectorMeasure._
import com.stratio.ioft.streaming.transformations.Aggregators.attitudeHistoryStream
import org.apache.spark.streaming.Milliseconds
object Combinators {
def desiredAndActualAttitudeStream(
desiredAttitudeStream: DStream[(DroneIdType, (BigInt, Attitude))],
attitudeStream: DStream[(DroneIdType, (BigInt, Attitude))],
timeRange: Long
): DStream[(DroneIdType, (BigInt, Attitude, Attitude))] = {
def windowed(stream: DStream[(DroneIdType, (BigInt, Attitude))]) = {
val windowDuration = Milliseconds(timeRange)
stream.window(windowDuration, windowDuration)
}
windowed(desiredAttitudeStream) join attitudeHistoryStream(windowed(attitudeStream)) flatMap {
case (id, ( (ts, desired), actualAttitudeHistory)) =>
val closestAttitudes = actualAttitudeHistory.attitudeAt(ts)
closestAttitudes.headOption map { _ =>
val (_, actualAttitude: Attitude) = closestAttitudes.minBy {
case (actual_ts, _) => math.abs((actual_ts-ts).toLong)
}
id -> (ts, desired, actualAttitude)
}
}
}
}
开发者ID:pfcoperez,项目名称:sparkstream_ioft,代码行数:41,代码来源:Combinators.scala
示例4: VeChallengeRealTime
//设置package包名称以及导入依赖的类
package io.github.adrianulbona.ve
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Minutes, StreamingContext}
import twitter4j.Place
object VeChallengeRealTime {
def main(args: Array[String]) {
val spark = SparkSession.builder
.master("local[*]")
.appName("ve-challenge")
.getOrCreate()
val ssc = new StreamingContext(spark.sparkContext, Minutes(2))
val stream = TwitterUtils.createStream(ssc, None, Seq("challenge"))
val places: DStream[Place] = stream.map(status => Option(status.getPlace))
.filter(optionPlace => optionPlace.isDefined)
.map(place => place.get)
places.map(place => place.getCountryCode)
.countByValue()
.foreachRDD(batch => printStats(batch.sortBy({ case (_, count) => count }, ascending = false).take(5)))
ssc.start()
ssc.awaitTermination()
spark.stop()
}
def printStats(top5Countries: Array[(String, Long)]) {
println()
println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()))
top5Countries.foreach(println)
}
}
开发者ID:adrianulbona,项目名称:ve-challenge,代码行数:45,代码来源:VeChallengeRealTime.scala
示例5: LogAnalyzerWindowed
//设置package包名称以及导入依赖的类
package com.databricks.apps.logs
import scala.math.Ordering
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.dstream.DStream
class LogAnalyzerWindowed(val windowLength: Long, val slideInterval: Long) extends AnalyzeFunctions with Serializable {
import LogStatistics.EMPTY_LOG_STATISTICS
var logStatistics = EMPTY_LOG_STATISTICS
def processAccessLogs(accessLogsDStream: DStream[ApacheAccessLog]): Unit = {
val windowDStream: DStream[ApacheAccessLog] = accessLogsDStream
.window(Seconds(windowLength), Seconds(slideInterval))
windowDStream.foreachRDD(accessLogs => {
if (accessLogs.count() == 0) {
logStatistics = EMPTY_LOG_STATISTICS
} else {
logStatistics = LogStatistics(contentSizeStats(accessLogs).get,
responseCodeCount(accessLogs).take(100).toMap,
filterIPAddress(ipAddressCount(accessLogs)).take(100),
endpointCount(accessLogs).top(10)(Ordering.by[(String, Long), Long](_._2)).toMap)
}
})
}
def getLogStatistics: LogStatistics = logStatistics
}
开发者ID:krish121,项目名称:Spark-reference-applications,代码行数:32,代码来源:LogAnalyzerWindowed.scala
示例6: LogAnalyzerStreamingImportDirectory
//设置package包名称以及导入依赖的类
package com.databricks.apps.logs.chapter2
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import com.databricks.apps.logs.{ApacheAccessLog, LogAnalyzerRDD}
object LogAnalyzerStreamingImportDirectory extends App {
val WINDOW_LENGTH = Seconds(30)
val SLIDE_INTERVAL = Seconds(10)
val spark = SparkSession
.builder()
.appName("Log Analyzer Import Streaming HDFS")
.getOrCreate()
val streamingContext = new StreamingContext(spark.sparkContext, SLIDE_INTERVAL)
val directory = args(0)
// This method monitors a directory for new files to read in for streaming.
val logData: DStream[String] = streamingContext.textFileStream(directory)
val accessLogsDStream: DStream[ApacheAccessLog] = logData.map(ApacheAccessLog.parseLogLine)
val windowDStream: DStream[ApacheAccessLog] = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL)
val logAnalyzerRDD = LogAnalyzerRDD(spark)
windowDStream.foreachRDD(accessLogs => {
if (accessLogs.count() == 0) {
println("No access logs received in this time interval")
} else {
val logStatistics = logAnalyzerRDD.processRdd(accessLogs)
logStatistics.printToStandardOut()
}
})
// Start the streaming server.
streamingContext.start() // Start the computation
streamingContext.awaitTermination() // Wait for the computation to terminate
}
开发者ID:krish121,项目名称:Spark-reference-applications,代码行数:42,代码来源:LogAnalyzerStreamingImportDirectory.scala
示例7: StreamActor
//设置package包名称以及导入依赖的类
package services
import akka.actor.Actor.Receive
import akka.actor.{Actor, ActorRef, Props}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import play.api.libs.json.Json
import services.StreamActor.TopHashtag
import twitter4j.Status
class StreamActor[T] (val out : ActorRef, val stream : DStream[T]) extends Actor {
stream.foreachRDD { rdd =>
rdd.take(5).foreach { element =>
out ! element.toString
}
}
override def receive: Receive = {
case msg: String => out ! "Ok"
case TopHashtag(top) => out ! Json.toJson(top)
}
@scala.throws[Exception](classOf[Exception])
override def postStop(): Unit = {
}
}
object StreamActor {
def props[T](out: ActorRef, stream : DStream[T]) = Props(new StreamActor(out, stream))
case class TopHashtag(top : Map[String, Int])
}
开发者ID:OpenCompare,项目名称:live-pcm,代码行数:35,代码来源:StreamActor.scala
示例8: createStream
//设置package包名称以及导入依赖的类
package it.agilelab.bigdata.wasp.consumers.readers
import it.agilelab.bigdata.wasp.core.WaspSystem
import it.agilelab.bigdata.wasp.core.WaspSystem._
import it.agilelab.bigdata.wasp.core.kafka.CheckOrCreateTopic
import it.agilelab.bigdata.wasp.core.logging.WaspLogger
import it.agilelab.bigdata.wasp.core.models.{DefaultConfiguration, TopicModel}
import it.agilelab.bigdata.wasp.core.utils.{AvroToJsonUtil, ConfigManager, JsonToByteArrayUtil}
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
//TODO: check warning (not understood)
def createStream(group: String, topic: TopicModel)(implicit ssc: StreamingContext): DStream[String] = {
val kafkaConfig = ConfigManager.getKafkaConfig
val kafkaConfigMap: Map[String, String] = Map(
"zookeeper.connect" -> kafkaConfig.zookeeper.toString,
"zookeeper.connection.timeout.ms" -> kafkaConfig.zookeeper.timeout.getOrElse(DefaultConfiguration.timeout).toString
)
if (??[Boolean](WaspSystem.getKafkaAdminActor, CheckOrCreateTopic(topic.name, topic.partitions, topic.replicas))) {
val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](
ssc,
kafkaConfigMap + ("group.id" -> group),
Map(topic.name -> 3),
StorageLevel.MEMORY_AND_DISK_2
)
topic.topicDataType match {
case "avro" => receiver.map(x => (x._1, AvroToJsonUtil.avroToJson(x._2))).map(_._2)
case "json" => receiver.map(x => (x._1, JsonToByteArrayUtil.byteArrayToJson(x._2))).map(_._2)
case _ => receiver.map(x => (x._1, AvroToJsonUtil.avroToJson(x._2))).map(_._2)
}
} else {
logger.error(s"Topic not found on Kafka: $topic")
throw new Exception(s"Topic not found on Kafka: $topic")
}
}
}
开发者ID:agile-lab-dev,项目名称:wasp,代码行数:47,代码来源:KafkaReader.scala
示例9: TwitterUtils
//设置package包名称以及导入依赖的类
package com.aluxian.tweeather.streaming
import java.util.Properties
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import twitter4j.auth.{AccessToken, Authorization}
import twitter4j.{FilterQuery, Status, TwitterFactory}
object TwitterUtils {
def createMultiStream(ssc: StreamingContext,
queryBuilder: () => FilterQuery = () => null,
credentials: Seq[Authorization] = loadDefaultCredentials(),
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER
): DStream[Status] = {
credentials
.map(auth => createStream(ssc, Some(queryBuilder()), Some(auth)))
.reduce { (accStream, stream) => accStream.union(stream) }
}
private def loadDefaultCredentials(): Seq[Authorization] = {
val props = loadTwitterProperties()
val num = props.getProperty("twitter.credentials").toInt
1.to(num).map(i => {
val twitter = new TwitterFactory().getInstance()
twitter.setOAuthConsumer(
props.getProperty(s"twitter.credentials.$i.consumerKey"),
props.getProperty(s"twitter.credentials.$i.consumerSecret")
)
twitter.setOAuthAccessToken(new AccessToken(
props.getProperty(s"twitter.credentials.$i.token"),
props.getProperty(s"twitter.credentials.$i.tokenSecret")
))
twitter.getAuthorization
})
}
private def loadTwitterProperties(): Properties = {
val properties = new Properties()
val stream = getClass.getResourceAsStream("/com/aluxian/tweeather/res/twitter.properties")
properties.load(stream)
stream.close()
properties
}
}
开发者ID:cnajeefa,项目名称:Tourism-Sentiment-Analysis,代码行数:53,代码来源:TwitterUtils.scala
示例10: KafkaEngine
//设置package包名称以及导入依赖的类
package com.lljv.analytics.hadoopengine
import java.util.HashMap
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import scala.util.control.NonFatal
class KafkaEngine(val settings: KafkaSettings) extends Serializable {
var producer: Option[KafkaProducer[String, String]] = None
def getStreamingParameters(): Map[String, String] = {
val parameters = Map[String, String](
"metadata.broker.list" -> settings.kafkaBroker,
"bootstrap.servers" -> settings.kafkaBroker,
"key.serializer" -> settings.stringSerializerType,
"value.serializer" -> settings.stringSerializerType,
"key.deserializer" -> settings.stringDeserializerType,
"value.deserializer" -> settings.stringDeserializerType,
"group.id" -> settings.consumerGroupId
)
return parameters
}
def startStream(
topicName: String,
sparkEngine: SparkStreamEngine
): Option[InputDStream[ConsumerRecord[String, String]]] =
{
val stream: Option[InputDStream[ConsumerRecord[String, String]]] = try {
Some(KafkaUtils.createDirectStream[String,String](
sparkEngine.streamingContext.get,
PreferConsistent,
Subscribe[String, String](Array(topicName), this.getStreamingParameters())
))
} catch {
case NonFatal(exc) => {
// printf(exc.getMessage())
// TODO: logging
None
}
}
return stream
}
}
开发者ID:dotdeb,项目名称:Science-Finder,代码行数:56,代码来源:KafkaEngine.scala
示例11: StreamingExample
//设置package包名称以及导入依赖的类
package com.pygmalios.reactiveinflux.spark.examples
import com.pygmalios.reactiveinflux._
import com.pygmalios.reactiveinflux.spark._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.joda.time.DateTime
import scala.concurrent.duration._
object StreamingExample extends App {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Example")
val ssc = new StreamingContext(conf, Seconds(1))
val point1 = Point(
time = DateTime.now(),
measurement = "measurement1",
tags = Map(
"tagKey1" -> "tagValue1",
"tagKey2" -> "tagValue2"),
fields = Map(
"fieldKey1" -> "fieldValue1",
"fieldKey2" -> 10.7)
)
// Provide settings for reactiveinflux
implicit val params = ReactiveInfluxDbName("example")
implicit val awaitAtMost = 1.second
// Create DStream of Influx points
val queue = new scala.collection.mutable.Queue[RDD[Point]]
val queueStream: DStream[Point] = ssc.queueStream(queue)
// Add single RDD with a single Influx point to the DStream
queue.enqueue(ssc.sparkContext.parallelize(Seq(point1)))
// Save DStream to Influx
queueStream.saveToInflux()
// Start Spark streaming
ssc.start()
ssc.awaitTermination()
}
开发者ID:pygmalios,项目名称:reactiveinflux-spark,代码行数:49,代码来源:StreamingExample.scala
示例12: PointDStreamExtensionsSpec
//设置package包名称以及导入依赖的类
package com.pygmalios.reactiveinflux.extensions
import com.holdenkarau.spark.testing.StreamingActionBase
import com.pygmalios.reactiveinflux.spark._
import com.pygmalios.reactiveinflux._
import org.apache.spark.streaming.dstream.DStream
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
@RunWith(classOf[JUnitRunner])
class PointDStreamExtensionsSpec extends StreamingActionBase
with BeforeAndAfterAll {
import PointRDDExtensionsSpec._
override def beforeAll: Unit = {
super.beforeAll
withInflux(_.create())
}
override def afterAll: Unit = {
withInflux(_.drop())
super.afterAll
}
test("write single point to Influx") {
val points = List(point1)
// Execute
runAction(Seq(points), (dstream: DStream[Point]) => dstream.saveToInflux())
// Assert
val result = withInflux(
_.query(Query(s"SELECT * FROM $measurement1")).result.singleSeries)
assert(result.rows.size == 1)
val row = result.rows.head
assert(row.time == point1.time)
assert(row.values.size == 5)
}
}
开发者ID:pygmalios,项目名称:reactiveinflux-spark,代码行数:43,代码来源:PointDStreamExtensionsSpec.scala
示例13: CommandStreamProcessor
//设置package包名称以及导入依赖的类
package com.crystal
package processors
// Spark
import org.apache.spark.streaming.kinesis._
import org.apache.spark.streaming.{ Duration, StreamingContext }
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.storage.StorageLevel
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
// JSON Parsing
import scala.util.parsing.json.JSON
object CommandStreamProcessor {
def setup(appConfig: AppConfig, streamingCtx: StreamingContext) = {
val cmdStream = getCommandStream(appConfig, streamingCtx)
cmdStream.foreachRDD { rdd =>
rdd.foreach{ cmd =>
println("--- Command Received ---")
}
}
}
private def getCommandStream(
appConfig: AppConfig,
streamingCtx: StreamingContext): DStream[Map[String, Any]] = {
val stream = KinesisUtils.createStream(
streamingCtx,
appConfig.commandAppName,
appConfig.commandStreamName,
s"kinesis.${appConfig.regionName}.amazonaws.com",
appConfig.regionName,
InitialPositionInStream.LATEST,
Duration(appConfig.checkpointInterval),
StorageLevel.MEMORY_AND_DISK_2
)
stream
.map { byteArray => new String(byteArray) }
.map { jsonStr => JSON.parseFull(jsonStr).get.asInstanceOf[Map[String, Any]] }
}
}
开发者ID:crystal-project-inc,项目名称:streaming_user_segmentation,代码行数:45,代码来源:CommandStreamProcessor.scala
示例14: RedditStreamFactory
//设置package包名称以及导入依赖的类
package com.microsoft.partnercatalyst.fortis.spark.sources.streamfactories
import com.github.catalystcode.fortis.spark.streaming.reddit.dto.RedditObject
import com.github.catalystcode.fortis.spark.streaming.reddit.{RedditAuth, RedditUtils}
import com.microsoft.partnercatalyst.fortis.spark.sources.streamprovider.{ConnectorConfig, StreamFactory}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
class RedditStreamFactory extends StreamFactoryBase[RedditObject] {
override protected def canHandle(connectorConfig: ConnectorConfig): Boolean = {
connectorConfig.name == "RedditObject"
}
override protected def buildStream(ssc: StreamingContext, connectorConfig: ConnectorConfig): DStream[RedditObject] = {
import ParameterExtensions._
val params = connectorConfig.parameters
val auth = RedditAuth(params.getAs[String]("applicationId"), params.getAs[String]("applicationSecret"))
val keywords = params.getAs[String]("keywords").split('|')
val subreddit = params.get("subreddit").asInstanceOf[Option[String]]
val searchLimit = params.getOrElse("searchLimit", "25").asInstanceOf[String].toInt
val searchResultType = Some(params.getOrElse("searchResultType", "link").asInstanceOf[String])
RedditUtils.createPageStream(
auth,
keywords.toSeq,
ssc,
subredit = subreddit,
searchLimit = searchLimit,
searchResultType = searchResultType
)
}
}
开发者ID:CatalystCode,项目名称:project-fortis-spark,代码行数:34,代码来源:RedditStreamFactory.scala
示例15: RadioStreamFactory
//设置package包名称以及导入依赖的类
package com.microsoft.partnercatalyst.fortis.spark.sources.streamfactories
import com.microsoft.partnercatalyst.fortis.spark.sources.streamprovider.{ConnectorConfig, StreamFactory}
import com.microsoft.partnercatalyst.fortis.spark.sources.streamwrappers.radio.{RadioStreamUtils, RadioTranscription}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
class RadioStreamFactory extends StreamFactoryBase[RadioTranscription]{
override protected def canHandle(connectorConfig: ConnectorConfig): Boolean = {
connectorConfig.name == "Radio"
}
override protected def buildStream(ssc: StreamingContext, connectorConfig: ConnectorConfig): DStream[RadioTranscription] = {
import ParameterExtensions._
val params = connectorConfig.parameters
RadioStreamUtils.createStream(ssc,
params.getAs[String]("radioUrl"),
params.getAs[String]("audioType"),
params.getAs[String]("locale"),
params.getAs[String]("subscriptionKey"),
params.getAs[String]("speechType"),
params.getAs[String]("outputFormat")
)
}
}
开发者ID:CatalystCode,项目名称:project-fortis-spark,代码行数:28,代码来源:RadioStreamFactory.scala
示例16: BingPageStreamFactory
//设置package包名称以及导入依赖的类
package com.microsoft.partnercatalyst.fortis.spark.sources.streamfactories
import com.github.catalystcode.fortis.spark.streaming.bing.dto.BingPost
import com.github.catalystcode.fortis.spark.streaming.bing.{BingAuth, BingUtils}
import com.microsoft.partnercatalyst.fortis.spark.sources.streamprovider.{ConnectorConfig, StreamFactory}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
class BingPageStreamFactory extends StreamFactoryBase[BingPost]{
override protected def canHandle(connectorConfig: ConnectorConfig): Boolean = {
connectorConfig.name == "BingPage"
}
override protected def buildStream(ssc: StreamingContext, connectorConfig: ConnectorConfig): DStream[BingPost] = {
import ParameterExtensions._
val params = connectorConfig.parameters
val auth = BingAuth(params.getAs[String]("accessToken"))
val searchInstanceId = params.getAs[String]("searchInstanceId")
val keywords = params.getAs[String]("keywords").split('|')
BingUtils.createPageStream(ssc, auth, searchInstanceId, keywords)
}
}
开发者ID:CatalystCode,项目名称:project-fortis-spark,代码行数:25,代码来源:BingPageStreamFactory.scala
示例17: InstagramTagStreamFactory
//设置package包名称以及导入依赖的类
package com.microsoft.partnercatalyst.fortis.spark.sources.streamfactories
import com.github.catalystcode.fortis.spark.streaming.instagram.dto.InstagramItem
import com.github.catalystcode.fortis.spark.streaming.instagram.{InstagramAuth, InstagramUtils}
import com.microsoft.partnercatalyst.fortis.spark.sources.streamprovider.{ConnectorConfig, StreamFactory}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
class InstagramTagStreamFactory extends StreamFactoryBase[InstagramItem]{
override protected def canHandle(connectorConfig: ConnectorConfig): Boolean = {
connectorConfig.name == "InstagramTag"
}
override protected def buildStream(ssc: StreamingContext, connectorConfig: ConnectorConfig): DStream[InstagramItem] = {
import ParameterExtensions._
val params = connectorConfig.parameters
val auth = InstagramAuth(params.getAs[String]("authToken"))
InstagramUtils.createTagStream(ssc, auth, params.getAs[String]("tag"))
}
}
开发者ID:CatalystCode,项目名称:project-fortis-spark,代码行数:23,代码来源:InstagramTagStreamFactory.scala
示例18: FacebookPageStreamFactory
//设置package包名称以及导入依赖的类
package com.microsoft.partnercatalyst.fortis.spark.sources.streamfactories
import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookPost
import com.github.catalystcode.fortis.spark.streaming.facebook.{FacebookAuth, FacebookUtils}
import com.microsoft.partnercatalyst.fortis.spark.sources.streamprovider.{ConnectorConfig, StreamFactory}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
class FacebookPageStreamFactory extends StreamFactoryBase[FacebookPost] {
override protected def canHandle(connectorConfig: ConnectorConfig): Boolean = {
connectorConfig.name == "FacebookPage"
}
override protected def buildStream(ssc: StreamingContext, connectorConfig: ConnectorConfig): DStream[FacebookPost] = {
import ParameterExtensions._
val params = connectorConfig.parameters
val facebookAuth = FacebookAuth(
params.getAs[String]("appId"),
params.getAs[String]("appSecret"),
params.getAs[String]("accessToken")
)
FacebookUtils.createPageStreams(ssc, facebookAuth, params.getTrustedSources.toSet)
}
}
开发者ID:CatalystCode,项目名称:project-fortis-spark,代码行数:27,代码来源:FacebookPageStreamFactory.scala
示例19: InstagramLocationStreamFactory
//设置package包名称以及导入依赖的类
package com.microsoft.partnercatalyst.fortis.spark.sources.streamfactories
import com.github.catalystcode.fortis.spark.streaming.instagram.dto.InstagramItem
import com.github.catalystcode.fortis.spark.streaming.instagram.{InstagramAuth, InstagramUtils}
import com.microsoft.partnercatalyst.fortis.spark.sources.streamprovider.{ConnectorConfig, StreamFactory}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
class InstagramLocationStreamFactory extends StreamFactoryBase[InstagramItem]{
override protected def canHandle(connectorConfig: ConnectorConfig): Boolean = {
connectorConfig.name == "InstagramLocation"
}
override protected def buildStream(ssc: StreamingContext, connectorConfig: ConnectorConfig): DStream[InstagramItem] = {
import ParameterExtensions._
val params = connectorConfig.parameters
val auth = InstagramAuth(params.getAs[String]("authToken"))
InstagramUtils.createLocationStream(
ssc,
auth,
latitude = params.getAs[String]("latitude").toDouble,
longitude = params.getAs[String]("longitude").toDouble)
}
}
开发者ID:CatalystCode,项目名称:project-fortis-spark,代码行数:27,代码来源:InstagramLocationStreamFactory.scala
示例20: FacebookCommentStreamFactory
//设置package包名称以及导入依赖的类
package com.microsoft.partnercatalyst.fortis.spark.sources.streamfactories
import com.github.catalystcode.fortis.spark.streaming.facebook.dto.FacebookComment
import com.github.catalystcode.fortis.spark.streaming.facebook.{FacebookAuth, FacebookUtils}
import com.microsoft.partnercatalyst.fortis.spark.sources.streamprovider.{ConnectorConfig, StreamFactory}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
class FacebookCommentStreamFactory extends StreamFactoryBase[FacebookComment] {
override protected def canHandle(connectorConfig: ConnectorConfig): Boolean = {
connectorConfig.name == "FacebookComment"
}
override protected def buildStream(ssc: StreamingContext, connectorConfig: ConnectorConfig): DStream[FacebookComment] = {
import ParameterExtensions._
val params = connectorConfig.parameters
val facebookAuth = FacebookAuth(
params.getAs[String]("appId"),
params.getAs[String]("appSecret"),
params.getAs[String]("accessToken")
)
FacebookUtils.createCommentsStreams(ssc, facebookAuth, params.getTrustedSources.toSet)
}
}
开发者ID:CatalystCode,项目名称:project-fortis-spark,代码行数:27,代码来源:FacebookCommentsStreamFactory.scala
注:本文中的org.apache.spark.streaming.dstream.DStream类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论