本文整理汇总了Scala中org.apache.spark.streaming.dstream.InputDStream类的典型用法代码示例。如果您正苦于以下问题:Scala InputDStream类的具体用法?Scala InputDStream怎么用?Scala InputDStream使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了InputDStream类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: KafkaUtility
//设置package包名称以及导入依赖的类
package com.knoldus.streaming.kafka
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object KafkaUtility {
//TODO It should read from config
private val kafkaParams = Map(
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "earliest",
"group.id" -> "tweet-consumer"
)
private val preferredHosts = LocationStrategies.PreferConsistent
def createDStreamFromKafka(ssc: StreamingContext, topics: List[String]): InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String, String](
ssc,
preferredHosts,
ConsumerStrategies.Subscribe[String, String](topics.distinct, kafkaParams)
)
}
开发者ID:knoldus,项目名称:real-time-stream-processing-engine,代码行数:32,代码来源:KafkaUtility.scala
示例2: RsvpStreaming
//设置package包名称以及导入依赖的类
package com.github.mmolimar.asks.streaming
import java.util.UUID
import com.github.mmolimar.askss.common.implicits._
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object RsvpStreaming extends App with LazyLogging {
val filter = config.getString("spark.filter").toLowerCase.split(",").toList
val ssc = new StreamingContext(buildSparkConfig, Seconds(5))
//TODO
kafkaStream(ssc)
.map(_.value())
.map(_.toEvent)
.filter(rsvp => {
filter.exists(rsvp.event.get.event_name.contains(_))
})
.print()
ssc.start()
ssc.awaitTermination()
def buildSparkConfig(): SparkConf = {
new SparkConf()
.setMaster(config.getString("spark.master"))
.setAppName("RsvpStreaming")
.set("spark.streaming.ui.retainedBatches", "5")
.set("spark.streaming.backpressure.enabled", "true")
.set("spark.sql.parquet.compression.codec", "snappy")
.set("spark.sql.parquet.mergeSchema", "true")
.set("spark.sql.parquet.binaryAsString", "true")
}
def kafkaStream(ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
val topics = Set(config.getString("kafka.topic"))
val kafkaParams = Map[String, Object](
"metadata.broker.list" -> config.getString("kafka.brokerList"),
"enable.auto.commit" -> config.getBoolean("kafka.autoCommit").toString,
"auto.offset.reset" -> config.getString("kafka.autoOffset"),
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> config.getString("kafka.brokerList"),
ConsumerConfig.GROUP_ID_CONFIG -> s"consumer-${UUID.randomUUID}",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer]
)
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy)
}
}
开发者ID:mmolimar,项目名称:askss,代码行数:61,代码来源:RsvpStreaming.scala
示例3: KafkaEngine
//设置package包名称以及导入依赖的类
package com.lljv.analytics.hadoopengine
import java.util.HashMap
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import scala.util.control.NonFatal
class KafkaEngine(val settings: KafkaSettings) extends Serializable {
var producer: Option[KafkaProducer[String, String]] = None
def getStreamingParameters(): Map[String, String] = {
val parameters = Map[String, String](
"metadata.broker.list" -> settings.kafkaBroker,
"bootstrap.servers" -> settings.kafkaBroker,
"key.serializer" -> settings.stringSerializerType,
"value.serializer" -> settings.stringSerializerType,
"key.deserializer" -> settings.stringDeserializerType,
"value.deserializer" -> settings.stringDeserializerType,
"group.id" -> settings.consumerGroupId
)
return parameters
}
def startStream(
topicName: String,
sparkEngine: SparkStreamEngine
): Option[InputDStream[ConsumerRecord[String, String]]] =
{
val stream: Option[InputDStream[ConsumerRecord[String, String]]] = try {
Some(KafkaUtils.createDirectStream[String,String](
sparkEngine.streamingContext.get,
PreferConsistent,
Subscribe[String, String](Array(topicName), this.getStreamingParameters())
))
} catch {
case NonFatal(exc) => {
// printf(exc.getMessage())
// TODO: logging
None
}
}
return stream
}
}
开发者ID:dotdeb,项目名称:Science-Finder,代码行数:56,代码来源:KafkaEngine.scala
示例4: ParseTicketsSpec
//设置package包名称以及导入依赖的类
package com.octo.nad.handson.spark.streaming
import com.octo.nad.handson.model.Produit
import com.octo.nad.handson.model.Ticket
import com.octo.nad.handson.spark.StreamingPipeline
import com.octo.nad.handson.spark.specs.SparkStreamingSpec
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
class ParseTicketsSpec extends SparkStreamingSpec {
"La méthode parseTickets de l'object Pipeline" should "parser des tickets JSON et renvoyer des objets de la class Ticket" in {
val inputDstream = generateDStreamTicket
val result = StreamingPipeline.parseTickets(inputDstream)
var resultsRDD = scala.collection.mutable.ArrayBuffer.empty[Array[Ticket]]
result.foreachRDD(rdd => {
resultsRDD += rdd.collect()
})
ssc.start()
ssc.awaitTerminationOrTimeout(1000)
eventually{
val resultArrayFromRDD = resultsRDD.flatten.toList
resultArrayFromRDD should have size 1
resultArrayFromRDD.head should be(ticket)
}
}
private def generateDStreamTicket: InputDStream[String] = {
val lines = scala.collection.mutable.Queue[RDD[String]]()
val dstream = ssc.queueStream(lines)
val jsonString = ticket.toJson
lines += sc.makeRDD(Seq(jsonString))
dstream
}
private val ticket = {
val p1 = Produit(158, "Lessive Mir", 1, 2, 3, 2, BigDecimal(3.55))
val p2 = Produit(89, "Dentifrice", 14, 62, 23, 2, BigDecimal(4.35))
val p3 = Produit(10, "Brosse à dents", 21, 246, 3, 2, BigDecimal(2.12))
val p4 = Produit(7, "Chocolats", 1664, 26, 34, 2, BigDecimal(1.50))
val productList = p1 :: p2 :: p3 :: p4 :: Nil
Ticket(12, 313, 33, "04:44", Some(2), 12, BigDecimal(1), productList)
}
}
开发者ID:tmouron,项目名称:hands-on-spark,代码行数:56,代码来源:ParseTicketsSpec.scala
示例5: AddSectionStringSpec
//设置package包名称以及导入依赖的类
package com.octo.nad.handson.spark.streaming
import com.octo.nad.handson.spark.StreamingPipeline
import com.octo.nad.handson.spark.specs.SparkStreamingSpec
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
class AddSectionStringSpec extends SparkStreamingSpec {
"La méthode addSectionString de l'object Pipeline" should "faire la jointure entre le stream et le dataset des libellés de section pour ne plus avoir un identifiant technique de section mais un libellé" in {
val inputDStream = generateDStreamTicket
val result = StreamingPipeline.addSectionString(inputDStream)
var resultsRDD = scala.collection.mutable.ArrayBuffer.empty[Array[(String, BigDecimal)]]
result.foreachRDD(rdd => {
resultsRDD += rdd.collect()
})
ssc.start()
ssc.awaitTerminationOrTimeout(1000)
eventually{
val resultArrayFromRDD = resultsRDD.flatten.toList
resultArrayFromRDD should contain ("JARDIN VEGETAL", BigDecimal(1.22))
resultArrayFromRDD should contain ("MULTIMEDIA", BigDecimal(4.40))
}
}
private def generateDStreamTicket: InputDStream[(Int, BigDecimal)] = {
val lines = scala.collection.mutable.Queue[RDD[(Int, BigDecimal)]]()
val dstream = ssc.queueStream(lines)
lines += sc.makeRDD((233, BigDecimal(1.22)) :: (146, BigDecimal(4.40)) :: Nil)
dstream
}
}
开发者ID:tmouron,项目名称:hands-on-spark,代码行数:40,代码来源:AddSectionStringSpec.scala
示例6: ComputeCaBySectionSpec
//设置package包名称以及导入依赖的类
package com.octo.nad.handson.spark.streaming
import com.octo.nad.handson.model.Produit
import com.octo.nad.handson.spark.StreamingPipeline
import com.octo.nad.handson.spark.specs.SparkStreamingSpec
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
class ComputeCaBySectionSpec extends SparkStreamingSpec {
"La méthode computeCaBySection de l'object Pipeline" should "renvoyer un tuple (String,Double) => (Section,Ca cumulé)" in {
val inputDstream = generateDStreamArticle
val result = StreamingPipeline.computeCaBySection(inputDstream)
var resultsRDD = scala.collection.mutable.ArrayBuffer.empty[Array[(Int, BigDecimal)]]
result.foreachRDD(rdd => {
resultsRDD += rdd.collect()
})
ssc.start()
ssc.awaitTerminationOrTimeout(1000)
Thread.sleep(1000)
eventually{
val resultArrayFromRDD = resultsRDD.flatten.toList
resultArrayFromRDD.length should be(3)
resultArrayFromRDD should contain (1, BigDecimal(5.67))
resultArrayFromRDD should contain (14, BigDecimal(4.35))
resultArrayFromRDD should contain(1664, BigDecimal(1.50))
}
}
val p1 = Produit(158, "Lessive Mir", 1, 2, 3, 2, BigDecimal(3.55))
val p2 = Produit(89, "Dentifrice", 14, 62, 23, 2, BigDecimal(4.35))
val p3 = Produit(10, "Brosse à dents", 1, 246, 3, 2, BigDecimal(2.12))
val p4 = Produit(7, "Chocolats", 1664, 26, 34, 2, BigDecimal(1.50))
private def generateDStreamArticle: InputDStream[Produit] = {
val lines = scala.collection.mutable.Queue[RDD[Produit]]()
val dstream = ssc.queueStream(lines)
lines += sc.makeRDD(Seq(p1,p2,p3,p4))
dstream
}
}
开发者ID:tmouron,项目名称:hands-on-spark,代码行数:49,代码来源:ComputeCaBySectionSpec.scala
示例7: ExplodeTicketsSpec
//设置package包名称以及导入依赖的类
package com.octo.nad.handson.spark.streaming
import com.octo.nad.handson.model.Produit
import com.octo.nad.handson.model.Ticket
import com.octo.nad.handson.spark.StreamingPipeline
import com.octo.nad.handson.spark.specs.SparkStreamingSpec
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
class ExplodeTicketsSpec extends SparkStreamingSpec {
"La méthode explodeTickets de l'object Pipeline" should "exploser les tickets JSON en lignes produit" in {
val inputDstream = generateDStreamTicket
val result = StreamingPipeline.explodeTickets(inputDstream)
var resultsRDD = scala.collection.mutable.ArrayBuffer.empty[Array[Produit]]
result.foreachRDD(rdd => {
resultsRDD += rdd.collect()
})
ssc.start()
ssc.awaitTerminationOrTimeout(1000)
eventually{
val resultArrayFromRDD = resultsRDD.flatten.toList
resultArrayFromRDD should have size 4
resultArrayFromRDD should contain(p1)
resultArrayFromRDD should contain(p2)
resultArrayFromRDD should contain(p3)
resultArrayFromRDD should contain(p4)
}
}
private def generateDStreamTicket: InputDStream[Ticket] = {
val lines = scala.collection.mutable.Queue[RDD[Ticket]]()
val dstream = ssc.queueStream(lines)
lines += sc.makeRDD(ticket :: Nil)
dstream
}
private val p1 = Produit(158, "Lessive Mir", 1, 2, 3, 2, BigDecimal(3.55))
private val p2 = Produit(89, "Dentifrice", 14, 62, 23, 2, BigDecimal(4.35))
private val p3 = Produit(10, "Brosse à dents", 21, 246, 3, 2, BigDecimal(2.12))
private val p4 = Produit(7, "Chocolats", 1664, 26, 34, 2, BigDecimal(1.50))
private val ticket = {
val productList = p1 :: p2 :: p3 :: p4 :: Nil
Ticket(12, 313, 33, "04:44", Some(2), 12, BigDecimal(1), productList)
}
}
开发者ID:tmouron,项目名称:hands-on-spark,代码行数:59,代码来源:ExplodeTicketsSpec.scala
示例8: TestableQueueInputDStream
//设置package包名称以及导入依赖的类
package org.apache.spark.streaming
import java.io.{ObjectInputStream, ObjectOutputStream}
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.streaming.dstream.InputDStream
import scala.collection.mutable.{ArrayBuffer, Queue}
import scala.reflect.ClassTag
class TestableQueueInputDStream[T: ClassTag](
ssc: StreamingContext,
val queue: Queue[RDD[T]],
oneAtATime: Boolean,
defaultRDD: RDD[T]
) extends InputDStream[T](ssc) {
override def start() { }
override def stop() { }
private def readObject(in: ObjectInputStream): Unit = {
logWarning("queueStream doesn't support checkpointing")
}
private def writeObject(oos: ObjectOutputStream): Unit = {
logWarning("queueStream doesn't support checkpointing")
}
override def compute(validTime: Time): Option[RDD[T]] = {
val buffer = new ArrayBuffer[RDD[T]]()
queue.synchronized {
if (oneAtATime && queue.nonEmpty) {
buffer += queue.dequeue()
} else {
buffer ++= queue
queue.clear()
}
}
if (buffer.nonEmpty) {
if (oneAtATime) {
Some(buffer.head)
} else {
Some(new UnionRDD(context.sc, buffer.toSeq))
}
} else if (defaultRDD != null) {
Some(defaultRDD)
} else {
Some(ssc.sparkContext.emptyRDD)
}
}
}
开发者ID:tmalaska,项目名称:SparkUnitTestingExamples,代码行数:54,代码来源:TestableQueueInputDStream.scala
示例9: updateZookeeper
//设置package包名称以及导入依赖的类
package com.groupon.dse.consumers
import java.util.Properties
import com.groupon.dse.configs.PluginConfigs
import com.groupon.dse.kafka.common.{State, WrappedMessage}
import com.groupon.dse.kafka.controllers.{StateController, StateControllerBuilder}
import com.groupon.dse.spark.plugins.ReceiverPlugin
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.spark.TaskContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.slf4j.LoggerFactory
def updateZookeeper(messages: InputDStream[(String, Array[Byte])], stateController: StateController, appConfigs: Properties): Unit = {
messages.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { _ =>
val offsetRange = offsetRanges(TaskContext.get.partitionId)
val topic = offsetRange.topic
val partitionId = offsetRange.partition
val zkPath = s"${appConfigs.getProperty("statecontroller.zk.root")}/$topic/$partitionId"
stateController.setState(zkPath,State(offsetRange.untilOffset+1, System.currentTimeMillis()))
stateController.close()
}
}
}
}
开发者ID:groupon,项目名称:schema-inferer,代码行数:31,代码来源:DirectKafkaConsumer.scala
示例10: KafkaSource
//设置package包名称以及导入依赖的类
package com.ippontech.kafka
import com.ippontech.kafka.stores.OffsetsStore
import com.typesafe.scalalogging.slf4j.LazyLogging
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import scala.reflect.ClassTag
object KafkaSource extends LazyLogging {
def kafkaStream[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag]
(ssc: StreamingContext, kafkaParams: Map[String, String], offsetsStore: OffsetsStore, topic: String): InputDStream[(K, V)] = {
val topics = Set(topic)
val storedOffsets = offsetsStore.readOffsets(topic)
val kafkaStream = storedOffsets match {
case None =>
// start from the latest offsets
KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaParams, topics)
case Some(fromOffsets) =>
// start from previously saved offsets
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](ssc, kafkaParams, fromOffsets, messageHandler)
}
// save the offsets
kafkaStream.foreachRDD(rdd => offsetsStore.saveOffsets(topic, rdd))
kafkaStream
}
// Kafka input stream
def kafkaStream[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag]
(ssc: StreamingContext, brokers: String, offsetsStore: OffsetsStore, topic: String): InputDStream[(K, V)] =
kafkaStream(ssc, Map("metadata.broker.list" -> brokers), offsetsStore, topic)
}
开发者ID:ippontech,项目名称:spark-kafka-source,代码行数:43,代码来源:KafkaSource.scala
注:本文中的org.apache.spark.streaming.dstream.InputDStream类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论