• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala InputDStream类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.spark.streaming.dstream.InputDStream的典型用法代码示例。如果您正苦于以下问题:Scala InputDStream类的具体用法?Scala InputDStream怎么用?Scala InputDStream使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了InputDStream类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: KafkaUtility

//设置package包名称以及导入依赖的类
package com.knoldus.streaming.kafka

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}


object KafkaUtility {

  //TODO It should read from config
  private val kafkaParams = Map(
    "bootstrap.servers" -> "localhost:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "auto.offset.reset" -> "earliest",
    "group.id" -> "tweet-consumer"
  )

  private val preferredHosts = LocationStrategies.PreferConsistent


  def createDStreamFromKafka(ssc: StreamingContext, topics: List[String]): InputDStream[ConsumerRecord[String, String]] =
    KafkaUtils.createDirectStream[String, String](
      ssc,
      preferredHosts,
      ConsumerStrategies.Subscribe[String, String](topics.distinct, kafkaParams)
    )

} 
开发者ID:knoldus,项目名称:real-time-stream-processing-engine,代码行数:32,代码来源:KafkaUtility.scala


示例2: RsvpStreaming

//设置package包名称以及导入依赖的类
package com.github.mmolimar.asks.streaming

import java.util.UUID

import com.github.mmolimar.askss.common.implicits._
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}


object RsvpStreaming extends App with LazyLogging {

  val filter = config.getString("spark.filter").toLowerCase.split(",").toList
  val ssc = new StreamingContext(buildSparkConfig, Seconds(5))

  //TODO
  kafkaStream(ssc)
    .map(_.value())
    .map(_.toEvent)
    .filter(rsvp => {
      filter.exists(rsvp.event.get.event_name.contains(_))
    })
    .print()

  ssc.start()
  ssc.awaitTermination()

  def buildSparkConfig(): SparkConf = {
    new SparkConf()
      .setMaster(config.getString("spark.master"))
      .setAppName("RsvpStreaming")
      .set("spark.streaming.ui.retainedBatches", "5")
      .set("spark.streaming.backpressure.enabled", "true")
      .set("spark.sql.parquet.compression.codec", "snappy")
      .set("spark.sql.parquet.mergeSchema", "true")
      .set("spark.sql.parquet.binaryAsString", "true")
  }

  def kafkaStream(ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
    val topics = Set(config.getString("kafka.topic"))

    val kafkaParams = Map[String, Object](
      "metadata.broker.list" -> config.getString("kafka.brokerList"),
      "enable.auto.commit" -> config.getBoolean("kafka.autoCommit").toString,
      "auto.offset.reset" -> config.getString("kafka.autoOffset"),
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> config.getString("kafka.brokerList"),
      ConsumerConfig.GROUP_ID_CONFIG -> s"consumer-${UUID.randomUUID}",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer]
    )

    val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy)
  }

} 
开发者ID:mmolimar,项目名称:askss,代码行数:61,代码来源:RsvpStreaming.scala


示例3: KafkaEngine

//设置package包名称以及导入依赖的类
package com.lljv.analytics.hadoopengine

import java.util.HashMap
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import scala.util.control.NonFatal


class KafkaEngine(val settings: KafkaSettings) extends Serializable {

  var producer: Option[KafkaProducer[String, String]] = None


  def getStreamingParameters(): Map[String, String] = {
    val parameters = Map[String, String](
      "metadata.broker.list" -> settings.kafkaBroker,
      "bootstrap.servers" -> settings.kafkaBroker,
      "key.serializer" -> settings.stringSerializerType,
      "value.serializer" -> settings.stringSerializerType,
      "key.deserializer" -> settings.stringDeserializerType,
      "value.deserializer" -> settings.stringDeserializerType,
      "group.id" -> settings.consumerGroupId
    )
    return parameters
  }

  def startStream(
                   topicName: String,
                   sparkEngine: SparkStreamEngine
                 ): Option[InputDStream[ConsumerRecord[String, String]]] =
  {
    val stream: Option[InputDStream[ConsumerRecord[String, String]]] = try {
      Some(KafkaUtils.createDirectStream[String,String](
        sparkEngine.streamingContext.get,
        PreferConsistent,
        Subscribe[String, String](Array(topicName), this.getStreamingParameters())
      ))
    } catch {
      case NonFatal(exc) => {
        // printf(exc.getMessage())
        // TODO: logging
        None
      }
    }
    return stream
  }

  

} 
开发者ID:dotdeb,项目名称:Science-Finder,代码行数:56,代码来源:KafkaEngine.scala


示例4: ParseTicketsSpec

//设置package包名称以及导入依赖的类
package com.octo.nad.handson.spark.streaming

import com.octo.nad.handson.model.Produit
import com.octo.nad.handson.model.Ticket
import com.octo.nad.handson.spark.StreamingPipeline
import com.octo.nad.handson.spark.specs.SparkStreamingSpec
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream

class ParseTicketsSpec extends SparkStreamingSpec {

  "La méthode parseTickets de l'object Pipeline" should "parser des tickets JSON et renvoyer des objets de la class Ticket" in {


    val inputDstream = generateDStreamTicket

    val result = StreamingPipeline.parseTickets(inputDstream)

    var resultsRDD = scala.collection.mutable.ArrayBuffer.empty[Array[Ticket]]
    result.foreachRDD(rdd => {
      resultsRDD += rdd.collect()
    })

    ssc.start()
    ssc.awaitTerminationOrTimeout(1000)


    eventually{
      val resultArrayFromRDD = resultsRDD.flatten.toList
      resultArrayFromRDD should have size 1
      resultArrayFromRDD.head should be(ticket)
    }
  }

  private def generateDStreamTicket: InputDStream[String] = {
    val lines = scala.collection.mutable.Queue[RDD[String]]()
    val dstream = ssc.queueStream(lines)

    val jsonString = ticket.toJson

    lines += sc.makeRDD(Seq(jsonString))
    dstream
  }

  private val ticket = {
    val p1 = Produit(158, "Lessive Mir", 1, 2, 3, 2, BigDecimal(3.55))
    val p2 = Produit(89, "Dentifrice", 14, 62, 23, 2, BigDecimal(4.35))
    val p3 = Produit(10, "Brosse à dents", 21, 246, 3, 2, BigDecimal(2.12))
    val p4 = Produit(7, "Chocolats", 1664, 26, 34, 2, BigDecimal(1.50))

    val productList = p1 :: p2 :: p3 :: p4 :: Nil

    Ticket(12, 313, 33, "04:44", Some(2), 12, BigDecimal(1), productList)
  }
} 
开发者ID:tmouron,项目名称:hands-on-spark,代码行数:56,代码来源:ParseTicketsSpec.scala


示例5: AddSectionStringSpec

//设置package包名称以及导入依赖的类
package com.octo.nad.handson.spark.streaming

import com.octo.nad.handson.spark.StreamingPipeline
import com.octo.nad.handson.spark.specs.SparkStreamingSpec
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream

class AddSectionStringSpec extends SparkStreamingSpec {


  "La méthode addSectionString de l'object Pipeline" should "faire la jointure entre le stream et le dataset des libellés de section pour ne plus avoir un identifiant technique de section mais un libellé" in {

    val inputDStream = generateDStreamTicket

    val result = StreamingPipeline.addSectionString(inputDStream)

    var resultsRDD = scala.collection.mutable.ArrayBuffer.empty[Array[(String, BigDecimal)]]
    result.foreachRDD(rdd => {
      resultsRDD += rdd.collect()
    })

    ssc.start()
    ssc.awaitTerminationOrTimeout(1000)
    eventually{
      val resultArrayFromRDD = resultsRDD.flatten.toList
      resultArrayFromRDD should contain ("JARDIN VEGETAL", BigDecimal(1.22))
      resultArrayFromRDD should contain ("MULTIMEDIA", BigDecimal(4.40))
    }
  }

  private def generateDStreamTicket: InputDStream[(Int, BigDecimal)] = {
    val lines = scala.collection.mutable.Queue[RDD[(Int, BigDecimal)]]()
    val dstream = ssc.queueStream(lines)

    lines += sc.makeRDD((233, BigDecimal(1.22)) :: (146, BigDecimal(4.40)) :: Nil)
    dstream
  }

} 
开发者ID:tmouron,项目名称:hands-on-spark,代码行数:40,代码来源:AddSectionStringSpec.scala


示例6: ComputeCaBySectionSpec

//设置package包名称以及导入依赖的类
package com.octo.nad.handson.spark.streaming

import com.octo.nad.handson.model.Produit
import com.octo.nad.handson.spark.StreamingPipeline
import com.octo.nad.handson.spark.specs.SparkStreamingSpec
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream

class ComputeCaBySectionSpec extends SparkStreamingSpec {

  "La méthode computeCaBySection de l'object Pipeline" should "renvoyer un tuple (String,Double) => (Section,Ca cumulé)" in {


    val inputDstream = generateDStreamArticle


    val result = StreamingPipeline.computeCaBySection(inputDstream)
    var resultsRDD = scala.collection.mutable.ArrayBuffer.empty[Array[(Int, BigDecimal)]]
    result.foreachRDD(rdd => {
      resultsRDD += rdd.collect()
    })
    ssc.start()
    ssc.awaitTerminationOrTimeout(1000)


    Thread.sleep(1000)
    eventually{
      val resultArrayFromRDD = resultsRDD.flatten.toList
      resultArrayFromRDD.length should be(3)
      resultArrayFromRDD should contain (1, BigDecimal(5.67))
      resultArrayFromRDD should contain (14, BigDecimal(4.35))
      resultArrayFromRDD should contain(1664, BigDecimal(1.50))
    }
  }

  val p1 = Produit(158, "Lessive Mir", 1, 2, 3, 2, BigDecimal(3.55))
  val p2 = Produit(89, "Dentifrice", 14, 62, 23, 2, BigDecimal(4.35))
  val p3 = Produit(10, "Brosse à dents", 1, 246, 3, 2, BigDecimal(2.12))
  val p4 = Produit(7, "Chocolats", 1664, 26, 34, 2, BigDecimal(1.50))

  private def generateDStreamArticle: InputDStream[Produit] = {
    val lines = scala.collection.mutable.Queue[RDD[Produit]]()
    val dstream = ssc.queueStream(lines)

    lines += sc.makeRDD(Seq(p1,p2,p3,p4))
    dstream
  }
} 
开发者ID:tmouron,项目名称:hands-on-spark,代码行数:49,代码来源:ComputeCaBySectionSpec.scala


示例7: ExplodeTicketsSpec

//设置package包名称以及导入依赖的类
package com.octo.nad.handson.spark.streaming

import com.octo.nad.handson.model.Produit
import com.octo.nad.handson.model.Ticket
import com.octo.nad.handson.spark.StreamingPipeline
import com.octo.nad.handson.spark.specs.SparkStreamingSpec
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream

class ExplodeTicketsSpec extends SparkStreamingSpec {

  "La méthode explodeTickets de l'object Pipeline" should "exploser les tickets JSON en lignes produit" in {


    val inputDstream = generateDStreamTicket


    val result = StreamingPipeline.explodeTickets(inputDstream)

    var resultsRDD = scala.collection.mutable.ArrayBuffer.empty[Array[Produit]]
    result.foreachRDD(rdd => {
      resultsRDD += rdd.collect()
    })

    ssc.start()
    ssc.awaitTerminationOrTimeout(1000)


    eventually{
      val resultArrayFromRDD = resultsRDD.flatten.toList
      resultArrayFromRDD should have size 4
      resultArrayFromRDD should contain(p1)
      resultArrayFromRDD should contain(p2)
      resultArrayFromRDD should contain(p3)
      resultArrayFromRDD should contain(p4)
    }
  }

  private def generateDStreamTicket: InputDStream[Ticket] = {
    val lines = scala.collection.mutable.Queue[RDD[Ticket]]()
    val dstream = ssc.queueStream(lines)

    lines += sc.makeRDD(ticket :: Nil)
    dstream
  }

  private val p1 = Produit(158, "Lessive Mir", 1, 2, 3, 2, BigDecimal(3.55))
  private val p2 = Produit(89, "Dentifrice", 14, 62, 23, 2, BigDecimal(4.35))
  private val p3 = Produit(10, "Brosse à dents", 21, 246, 3, 2, BigDecimal(2.12))
  private val p4 = Produit(7, "Chocolats", 1664, 26, 34, 2, BigDecimal(1.50))

  private val ticket = {

    val productList = p1 :: p2 :: p3 :: p4 :: Nil

    Ticket(12, 313, 33, "04:44", Some(2), 12, BigDecimal(1), productList)
  }
} 
开发者ID:tmouron,项目名称:hands-on-spark,代码行数:59,代码来源:ExplodeTicketsSpec.scala


示例8: TestableQueueInputDStream

//设置package包名称以及导入依赖的类
package org.apache.spark.streaming

import java.io.{ObjectInputStream, ObjectOutputStream}

import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.streaming.dstream.InputDStream

import scala.collection.mutable.{ArrayBuffer, Queue}
import scala.reflect.ClassTag

class TestableQueueInputDStream[T: ClassTag](
                                              ssc: StreamingContext,
                                              val queue: Queue[RDD[T]],
                                              oneAtATime: Boolean,
                                              defaultRDD: RDD[T]
                                              ) extends InputDStream[T](ssc) {

  override def start() { }

  override def stop() { }

  private def readObject(in: ObjectInputStream): Unit = {
    logWarning("queueStream doesn't support checkpointing")
  }

  private def writeObject(oos: ObjectOutputStream): Unit = {
    logWarning("queueStream doesn't support checkpointing")
  }

  override def compute(validTime: Time): Option[RDD[T]] = {
    val buffer = new ArrayBuffer[RDD[T]]()
    queue.synchronized {
      if (oneAtATime && queue.nonEmpty) {
        buffer += queue.dequeue()
      } else {
        buffer ++= queue
        queue.clear()
      }
    }
    if (buffer.nonEmpty) {
      if (oneAtATime) {
        Some(buffer.head)
      } else {
        Some(new UnionRDD(context.sc, buffer.toSeq))
      }
    } else if (defaultRDD != null) {
      Some(defaultRDD)
    } else {
      Some(ssc.sparkContext.emptyRDD)
    }
  }

} 
开发者ID:tmalaska,项目名称:SparkUnitTestingExamples,代码行数:54,代码来源:TestableQueueInputDStream.scala


示例9: updateZookeeper

//设置package包名称以及导入依赖的类
package com.groupon.dse.consumers

import java.util.Properties

import com.groupon.dse.configs.PluginConfigs
import com.groupon.dse.kafka.common.{State, WrappedMessage}
import com.groupon.dse.kafka.controllers.{StateController, StateControllerBuilder}
import com.groupon.dse.spark.plugins.ReceiverPlugin
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.spark.TaskContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.slf4j.LoggerFactory


  def updateZookeeper(messages: InputDStream[(String, Array[Byte])], stateController: StateController, appConfigs: Properties): Unit = {
    messages.foreachRDD { rdd =>
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd.foreachPartition { _ =>
        val offsetRange = offsetRanges(TaskContext.get.partitionId)
        val topic = offsetRange.topic
        val partitionId = offsetRange.partition
        val zkPath = s"${appConfigs.getProperty("statecontroller.zk.root")}/$topic/$partitionId"
        stateController.setState(zkPath,State(offsetRange.untilOffset+1, System.currentTimeMillis()))
        stateController.close()
      }
    }
  }
} 
开发者ID:groupon,项目名称:schema-inferer,代码行数:31,代码来源:DirectKafkaConsumer.scala


示例10: KafkaSource

//设置package包名称以及导入依赖的类
package com.ippontech.kafka

import com.ippontech.kafka.stores.OffsetsStore
import com.typesafe.scalalogging.slf4j.LazyLogging
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils

import scala.reflect.ClassTag

object KafkaSource extends LazyLogging {

  def kafkaStream[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag]
  (ssc: StreamingContext, kafkaParams: Map[String, String], offsetsStore: OffsetsStore, topic: String): InputDStream[(K, V)] = {

    val topics = Set(topic)

    val storedOffsets = offsetsStore.readOffsets(topic)
    val kafkaStream = storedOffsets match {
      case None =>
        // start from the latest offsets
        KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaParams, topics)
      case Some(fromOffsets) =>
        // start from previously saved offsets
        val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
        KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](ssc, kafkaParams, fromOffsets, messageHandler)
    }

    // save the offsets
    kafkaStream.foreachRDD(rdd => offsetsStore.saveOffsets(topic, rdd))

    kafkaStream
  }

  // Kafka input stream
  def kafkaStream[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag]
  (ssc: StreamingContext, brokers: String, offsetsStore: OffsetsStore, topic: String): InputDStream[(K, V)] =
    kafkaStream(ssc, Map("metadata.broker.list" -> brokers), offsetsStore, topic)

} 
开发者ID:ippontech,项目名称:spark-kafka-source,代码行数:43,代码来源:KafkaSource.scala



注:本文中的org.apache.spark.streaming.dstream.InputDStream类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala NullWritable类代码示例发布时间:2022-05-23
下一篇:
Scala Graph类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap