本文整理汇总了Scala中org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe类的典型用法代码示例。如果您正苦于以下问题:Scala Subscribe类的具体用法?Scala Subscribe怎么用?Scala Subscribe使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Subscribe类的6个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: Consumer
//设置package包名称以及导入依赖的类
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.mllib.classification.SVMModel
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.SparkSession
object Consumer {
def main(args: Array[String]): Unit = {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("streaming")
val sparkConf = new SparkConf().setMaster("local[8]").setAppName("KafkaTest")
val streamingContext = new StreamingContext(sparkConf, Seconds(1))
// Create a input direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val sc = SparkSession.builder().master("local[8]").appName("KafkaTest").getOrCreate()
val model = SVMModel.load(sc.sparkContext, "/home/xiaoyu/model")
val result = kafkaStream.map(record => (record.key, record.value))
result.foreachRDD(
patient => {
patient.collect().toBuffer.foreach(
(x: (Any, String)) => {
val features = x._2.split(',').map(x => x.toDouble).tail
println(model.predict(Vectors.dense(features)))
}
)
}
)
streamingContext.start()
streamingContext.awaitTermination()
}
}
开发者ID:XiaoyuGuo,项目名称:DataFusionClass,代码行数:55,代码来源:Consumer.scala
示例2: Persister2
//设置package包名称以及导入依赖的类
import java.util.{Calendar, UUID}
import java.util.concurrent.atomic.AtomicInteger
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
object Persister2 extends App {
def persist() = {
val UID = UUID.randomUUID().toString()
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
// "auto.offset.reset" -> "latest",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val conf = new SparkConf()
.setAppName("Wordcount")
.setMaster ("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint("checkpoint")
val topics = Array("AnalyzedData")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val i: AtomicInteger = new AtomicInteger(0)
stream.foreachRDD { rdd =>
val json: DataFrame = SparkSession.builder().getOrCreate().read.json(rdd.map(x => x.value()))
json.rdd.saveAsTextFile("data/" + UID + "/" + i.incrementAndGet())
println(UID + ": " + Calendar.getInstance.getTime + ": Found: " + rdd.count() + " lines")
}
println("starting")
ssc.start()
println("awaiting")
ssc.awaitTermination()
println("terminated")
}
persist()
}
开发者ID:Tiloon,项目名称:Movie-processing,代码行数:53,代码来源:Persister2.scala
示例3: Streaming
//设置package包名称以及导入依赖的类
package Pipeline
import java.util.{Calendar, Properties}
import collection.JavaConverters._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.json.JSONObject
import com.typesafe.scalalogging.slf4j.LazyLogging
import org.apache.log4j.BasicConfigurator
class Streaming(var server: String, var receiveTopic: String, var sendTopic: String) extends LazyLogging{
// Kafka Parameters
val props = new Properties()
props.put("bootstrap.servers", server)
props.put("client.id", "Streaming")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
BasicConfigurator.configure()
// Kafka stream
val conf = new SparkConf().setAppName("DataPipeline").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> server,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",
"group.id" -> "use_a_separate_group_id_for_each_stream"
)
val stream = KafkaUtils createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](Array(receiveTopic), kafkaParams)
)
stream.foreachRDD(rdd => {
val newRdd = rdd.map(record => {
val tmp = new JSONObject(record.value())
(tmp.get("StockSymbol"), (tmp.get("LastTradePrice").toString.toFloat, 1))
}).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).map(x => (x._1, x._2._1 / x._2._2))
val result = newRdd.collect()
for (ele <- result) {
val msg = new JSONObject(Map("StockSymbol" -> ele._1, "LastTradePrice" -> ele._2,
"SendTime" -> System.currentTimeMillis.toDouble / 1000).asJava)
val data = new ProducerRecord[String, String](sendTopic, msg.toString)
producer.send(data)
logger.info("Successfully send the averaged price " + msg)
}
})
}
开发者ID:Dukecat0613,项目名称:BigDataPipeline,代码行数:59,代码来源:Streaming.scala
示例4: Spark
//设置package包名称以及导入依赖的类
package uk.co.bitcat.streaming.spark
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.avg
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
object Spark {
private case class Measurement(time: String, pollution: Int)
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Pollution Monitor").setMaster("local[*]")
// Setting the batch interval over which we perform our pollution average calculation
val streamingContext = new StreamingContext(conf, Seconds(10))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "pollution_consumer",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// Creating a stream to read from Kafka
val topics = Array("pollution")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
// Calculate the pollution average over the last interval
stream.foreachRDD { rdd =>
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val row = rdd
.map(_.value.split(","))
.map(attributes => Measurement(attributes(0).trim, attributes(1).trim.toInt))
.toDF()
.agg(avg($"pollution") as "pollutionAverage")
.filter($"pollutionAverage" > 75.0)
.foreach(row => println("Raise alert for pollution level: " + row(0)))
}
streamingContext.start()
streamingContext.awaitTermination()
}
}
开发者ID:dscook,项目名称:streaming-examples,代码行数:58,代码来源:Spark.scala
示例5: Main
//设置package包名称以及导入依赖的类
package onextent.eventhubs.publisher
import com.microsoft.azure.eventhubs.EventData
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object Main extends Serializable with LazyLogging {
def main(args: Array[String]): Unit = {
val config = ConfigFactory.load().getConfig("main")
val sparkConfig = new SparkConf().set("spark.cores.max", "2")
val ssc = new StreamingContext(
new SparkContext(sparkConfig),
Seconds(config.getString("kafka.batchDuration").toInt))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> config.getString("kafka.brokerList"),
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> config.getString("kafka.consumerGroup"),
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array(config.getString("kafka.topic"))
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream
.map(record => record.value)
.foreachRDD(rdd =>
rdd.foreach(o => {
val sendEvent = new EventData(o.getBytes("UTF8"))
EhPublisher.ehClient.send(sendEvent)
}))
ssc.start()
ssc.awaitTermination()
}
}
开发者ID:navicore,项目名称:EventHubsKafkaPublisher,代码行数:54,代码来源:Main.scala
示例6: SparkStreamMain
//设置package包名称以及导入依赖的类
package io.bigfast.tracking.grpc
import io.bigfast.tracking.Event
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamMain {
def main(args:Array[String]) = {
val conf = new SparkConf().setMaster("spark://spark-master:7077").setAppName("SparkStreamMain")
val ssc = new StreamingContext(conf, Seconds(1))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "kafka:9092",
"key.deserializer" -> classOf[ByteArrayDeserializer],
"value.deserializer" -> classOf[ByteArrayDeserializer],
"group.id" -> "my.spark.stream",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("event")
val stream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte]](
ssc,
PreferConsistent,
Subscribe[Array[Byte], Array[Byte]](topics, kafkaParams)
)
stream.map(record => {
val id = if (record != null && record.key != null) new String(record.key) else "empty"
val event = Event.parseFrom(record.value)
println(s"id: ${id} event: ${event.toString}")
(id, event)
}).print()
ssc.start()
ssc.awaitTermination()
}
}
开发者ID:kykl,项目名称:pba,代码行数:44,代码来源:SparkStreamMain.scala
注:本文中的org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论