本文整理汇总了Scala中org.apache.spark.streaming.kafka010.KafkaUtils类的典型用法代码示例。如果您正苦于以下问题:Scala KafkaUtils类的具体用法?Scala KafkaUtils怎么用?Scala KafkaUtils使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了KafkaUtils类的4个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: KafkaUtility
//设置package包名称以及导入依赖的类
package com.knoldus.streaming.kafka
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object KafkaUtility {
//TODO It should read from config
private val kafkaParams = Map(
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "earliest",
"group.id" -> "tweet-consumer"
)
private val preferredHosts = LocationStrategies.PreferConsistent
def createDStreamFromKafka(ssc: StreamingContext, topics: List[String]): InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String, String](
ssc,
preferredHosts,
ConsumerStrategies.Subscribe[String, String](topics.distinct, kafkaParams)
)
}
开发者ID:knoldus,项目名称:real-time-stream-processing-engine,代码行数:32,代码来源:KafkaUtility.scala
示例2: Consumer
//设置package包名称以及导入依赖的类
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.mllib.classification.SVMModel
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.SparkSession
object Consumer {
def main(args: Array[String]): Unit = {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("streaming")
val sparkConf = new SparkConf().setMaster("local[8]").setAppName("KafkaTest")
val streamingContext = new StreamingContext(sparkConf, Seconds(1))
// Create a input direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val sc = SparkSession.builder().master("local[8]").appName("KafkaTest").getOrCreate()
val model = SVMModel.load(sc.sparkContext, "/home/xiaoyu/model")
val result = kafkaStream.map(record => (record.key, record.value))
result.foreachRDD(
patient => {
patient.collect().toBuffer.foreach(
(x: (Any, String)) => {
val features = x._2.split(',').map(x => x.toDouble).tail
println(model.predict(Vectors.dense(features)))
}
)
}
)
streamingContext.start()
streamingContext.awaitTermination()
}
}
开发者ID:XiaoyuGuo,项目名称:DataFusionClass,代码行数:55,代码来源:Consumer.scala
示例3: StatefulWordcount
//设置package包名称以及导入依赖的类
package com.test.spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka010.ConsumerStrategies
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.spark.streaming.kafka010.KafkaUtils
object StatefulWordcount extends App {
val conf = new SparkConf().setAppName("Stateful Wordcount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(10))
val kafkaParams = Map[String, String]("bootstrap.servers" -> "localhost:9092", "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "group.id" -> "mygroup", "auto.offset.reset" -> "earliest")
val topics = Set("widas")
val inputKafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
val words = inputKafkaStream.transform { rdd =>
rdd.flatMap(record => (record.value().toString.split(" ")))
}
val wordpairs = words.map(word => (word, 1))
ssc.checkpoint("/Users/nagainelu/bigdata/jobs/WordCount_checkpoint")
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
val wordCounts = wordpairs.reduceByKey(_ + _).updateStateByKey(updateFunc)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
开发者ID:malli3131,项目名称:SparkApps,代码行数:31,代码来源:StatefulWordcount.scala
示例4: RsvpStreaming
//设置package包名称以及导入依赖的类
package com.github.mmolimar.asks.streaming
import java.util.UUID
import com.github.mmolimar.askss.common.implicits._
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object RsvpStreaming extends App with LazyLogging {
val filter = config.getString("spark.filter").toLowerCase.split(",").toList
val ssc = new StreamingContext(buildSparkConfig, Seconds(5))
//TODO
kafkaStream(ssc)
.map(_.value())
.map(_.toEvent)
.filter(rsvp => {
filter.exists(rsvp.event.get.event_name.contains(_))
})
.print()
ssc.start()
ssc.awaitTermination()
def buildSparkConfig(): SparkConf = {
new SparkConf()
.setMaster(config.getString("spark.master"))
.setAppName("RsvpStreaming")
.set("spark.streaming.ui.retainedBatches", "5")
.set("spark.streaming.backpressure.enabled", "true")
.set("spark.sql.parquet.compression.codec", "snappy")
.set("spark.sql.parquet.mergeSchema", "true")
.set("spark.sql.parquet.binaryAsString", "true")
}
def kafkaStream(ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
val topics = Set(config.getString("kafka.topic"))
val kafkaParams = Map[String, Object](
"metadata.broker.list" -> config.getString("kafka.brokerList"),
"enable.auto.commit" -> config.getBoolean("kafka.autoCommit").toString,
"auto.offset.reset" -> config.getString("kafka.autoOffset"),
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> config.getString("kafka.brokerList"),
ConsumerConfig.GROUP_ID_CONFIG -> s"consumer-${UUID.randomUUID}",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer]
)
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumerStrategy)
}
}
开发者ID:mmolimar,项目名称:askss,代码行数:61,代码来源:RsvpStreaming.scala
注:本文中的org.apache.spark.streaming.kafka010.KafkaUtils类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论