本文整理汇总了Scala中kafka.serializer.DefaultDecoder类的典型用法代码示例。如果您正苦于以下问题:Scala DefaultDecoder类的具体用法?Scala DefaultDecoder怎么用?Scala DefaultDecoder使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了DefaultDecoder类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: StreamingApp
//设置package包名称以及导入依赖的类
package spark.test
import data.processing.avro.AvroDecoder
import kafka.serializer.StringDecoder
import kafka.serializer.DefaultDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
object StreamingApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Simple Streaming Application")
val ssc = new StreamingContext(conf, Seconds(1))
val topicsSet = "test".split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
val directKafkaStream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
ssc, kafkaParams, topicsSet
)
directKafkaStream.foreachRDD(rdd =>
rdd.foreachPartition(partitionOfRecords => {
val avroDecoder = new AvroDecoder("/event-record.json")
partitionOfRecords.map(m => (m._1, avroDecoder.decode(m._2))).foreach(m => println(m))
}))
ssc.start()
ssc.awaitTermination()
}
}
开发者ID:ipogudin,项目名称:data-processing-examples,代码行数:36,代码来源:StreamingApp.scala
示例2: KafkaPayload
//设置package包名称以及导入依赖的类
package tools
import kafka.serializer.DefaultDecoder
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
case class KafkaPayload(value: Array[Byte])
class KafkaDStreamSource(config: Map[String, String]) {
def createSource(ssc: StreamingContext, topic: String): DStream[KafkaPayload] = {
val kafkaParams = config
val kafkaTopics = Set(topic)
KafkaUtils.
createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
ssc,
kafkaParams,
kafkaTopics).
map(dStream => KafkaPayload(dStream._2))
}
}
object KafkaDStreamSource {
def apply(config: Map[String, String]): KafkaDStreamSource = new KafkaDStreamSource(config)
}
开发者ID:Antwnis,项目名称:kafka-streaming-examples,代码行数:29,代码来源:KafkaDStreamSource.scala
示例3: KafkaConsumer
//设置package包名称以及导入依赖的类
package Services
import java.util.Properties
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException, Whitelist}
import kafka.serializer.DefaultDecoder
class KafkaConsumer(topic: String, groupId: String, zookeeperConnect: String) {
private val props = new Properties()
props.put("group.id", groupId)
props.put("zookeeper.connect", zookeeperConnect)
props.put("auto.offset.reset", "smallest")
props.put("consumer.timeout.ms", "500")
props.put("auto.commit.interval.ms", "500")
private val config = new ConsumerConfig(props)
private val connector = Consumer.create(config)
private val filterSpec = new Whitelist(topic)
private val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())(0)
lazy val iterator = streams.iterator()
def read() =
try {
if (hasNext) {
println("Getting message from queue.............")
val message = iterator.next().message()
Some(new String(message))
} else {
None
}
} catch {
case ex: Throwable =>
ex.printStackTrace()
None
}
private def hasNext(): Boolean =
try
iterator.hasNext()
catch {
case timeOutEx: ConsumerTimeoutException =>
false
case ex: Throwable =>
println("Getting error when reading message ")
false
}
def close(): Unit = connector.shutdown()
}
开发者ID:pranjut,项目名称:consul-kafka-microservices,代码行数:58,代码来源:KafkaConsumer.scala
示例4: KafkaConsumer
//设置package包名称以及导入依赖的类
package com.knoldus.kafka.consumer
import java.util.Properties
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException, Whitelist}
import kafka.serializer.DefaultDecoder
class KafkaConsumer(topic: String, groupId: String, zookeeperConnect: String) {
private val props = new Properties()
props.put("group.id", groupId)
props.put("zookeeper.connect", zookeeperConnect)
props.put("auto.offset.reset", "smallest")
//2 minute consumer timeout
props.put("consumer.timeout.ms", "120000")
//commit after each 10 second
props.put("auto.commit.interval.ms", "10000")
private val config = new ConsumerConfig(props)
private val connector = Consumer.create(config)
private val filterSpec = new Whitelist(topic)
private val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())(0)
lazy val iterator = streams.iterator()
def read(): Option[String] =
try {
if (hasNext) {
println("Getting message from queue.............")
val message = iterator.next().message()
Some(new String(message))
} else {
None
}
} catch {
case ex: Exception =>
ex.printStackTrace()
None
}
private def hasNext(): Boolean =
try
iterator.hasNext()
catch {
case timeOutEx: ConsumerTimeoutException =>
false
case ex: Exception =>
ex.printStackTrace()
println("Getting error when reading message ")
false
}
def close(): Unit = connector.shutdown()
}
开发者ID:knoldus,项目名称:activator-kafka-scala-producer-consumer.g8,代码行数:59,代码来源:KafkaConsumer.scala
示例5: createStream
//设置package包名称以及导入依赖的类
package it.agilelab.bigdata.wasp.consumers.readers
import it.agilelab.bigdata.wasp.core.WaspSystem
import it.agilelab.bigdata.wasp.core.WaspSystem._
import it.agilelab.bigdata.wasp.core.kafka.CheckOrCreateTopic
import it.agilelab.bigdata.wasp.core.logging.WaspLogger
import it.agilelab.bigdata.wasp.core.models.{DefaultConfiguration, TopicModel}
import it.agilelab.bigdata.wasp.core.utils.{AvroToJsonUtil, ConfigManager, JsonToByteArrayUtil}
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
//TODO: check warning (not understood)
def createStream(group: String, topic: TopicModel)(implicit ssc: StreamingContext): DStream[String] = {
val kafkaConfig = ConfigManager.getKafkaConfig
val kafkaConfigMap: Map[String, String] = Map(
"zookeeper.connect" -> kafkaConfig.zookeeper.toString,
"zookeeper.connection.timeout.ms" -> kafkaConfig.zookeeper.timeout.getOrElse(DefaultConfiguration.timeout).toString
)
if (??[Boolean](WaspSystem.getKafkaAdminActor, CheckOrCreateTopic(topic.name, topic.partitions, topic.replicas))) {
val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](
ssc,
kafkaConfigMap + ("group.id" -> group),
Map(topic.name -> 3),
StorageLevel.MEMORY_AND_DISK_2
)
topic.topicDataType match {
case "avro" => receiver.map(x => (x._1, AvroToJsonUtil.avroToJson(x._2))).map(_._2)
case "json" => receiver.map(x => (x._1, JsonToByteArrayUtil.byteArrayToJson(x._2))).map(_._2)
case _ => receiver.map(x => (x._1, AvroToJsonUtil.avroToJson(x._2))).map(_._2)
}
} else {
logger.error(s"Topic not found on Kafka: $topic")
throw new Exception(s"Topic not found on Kafka: $topic")
}
}
}
开发者ID:agile-lab-dev,项目名称:wasp,代码行数:47,代码来源:KafkaReader.scala
示例6: StreamConsumer
//设置package包名称以及导入依赖的类
package example.consumer
import kafka.consumer.{Consumer => KafkaConsumer, ConsumerIterator, Whitelist}
import kafka.serializer.{DefaultDecoder, Decoder}
import scala.collection.JavaConversions._
case class StreamConsumer(topics: List[String]) extends Consumer(topics) {
//topics to listen
private val filterSpec = new Whitelist(topics.mkString(","))
protected val keyDecoder: Decoder[Array[Byte]] = new DefaultDecoder()
protected val valueDecoder: Decoder[Array[Byte]] = new DefaultDecoder()
private lazy val consumer = KafkaConsumer.create(config)
private lazy val stream = consumer.createMessageStreamsByFilter(filterSpec, 1, keyDecoder, valueDecoder).get(0)
def read(): Stream[String] = Stream.cons(new String(stream.head.message()), read())
}
object StreamConsumer {
def apply(topics: List[String], kDecoder: Decoder[Array[Byte]], vDecoder: Decoder[Array[Byte]]) = new StreamConsumer(topics) {
override val keyDecoder = kDecoder
override val valueDecoder = vDecoder
}
}
case class SingleTopicConsumer(topic: String) extends Consumer(List(topic)) {
private lazy val consumer = KafkaConsumer.create(config)
val threadNum = 1
private lazy val consumerMap = consumer.createMessageStreams(Map(topic -> threadNum))
private lazy val stream = consumerMap.getOrElse(topic, List()).head
override def read(): Stream[String] = Stream.cons(new String(stream.head.message()), read())
}
开发者ID:alonsoir,项目名称:hello-kafka-twitter-scala,代码行数:36,代码来源:StreamConsumer.scala
示例7: KafkaConsumer
//设置package包名称以及导入依赖的类
package services.kafka.consumer
import java.util.Properties
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException, Whitelist}
import kafka.serializer.DefaultDecoder
import play.api.Logger
class KafkaConsumer(topic: String, groupId: String, zookeeperConnect: String) {
private val props = new Properties()
props.put("group.id", groupId)
props.put("zookeeper.connect", zookeeperConnect)
props.put("auto.offset.reset", "smallest")
//2 minute consumer timeout
props.put("consumer.timeout.ms", "120000")
//commit after each 10 second
props.put("auto.commit.interval.ms", "10000")
private val config = new ConsumerConfig(props)
private val connector = Consumer.create(config)
private val filterSpec = new Whitelist(topic)
private val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())(0)
lazy val iterator = streams.iterator()
def read(): Option[String] =
try {
if (hasNext) {
Logger.info("Getting message from queue.............")
val message = iterator.next().message()
Some(new String(message))
} else {
None
}
} catch {
case ex: Exception =>
ex.printStackTrace()
None
}
private def hasNext(): Boolean =
try
iterator.hasNext()
catch {
case timeOutEx: ConsumerTimeoutException =>
false
case ex: Exception =>
Logger.error("Getting error when reading message ",ex)
false
}
def close(): Unit = connector.shutdown()
}
开发者ID:satendrakumar,项目名称:play-kafka-example,代码行数:59,代码来源:KafkaConsumer.scala
示例8: KafkaConsumer
//设置package包名称以及导入依赖的类
package controllers
import java.util.Properties
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException, Whitelist}
import kafka.serializer.DefaultDecoder
class KafkaConsumer(topic: String, groupId: String, zookeeperConnect: String) {
private val props = new Properties()
props.put("group.id", groupId)
props.put("zookeeper.connect", zookeeperConnect)
props.put("auto.offset.reset", "smallest")
//2 minute consumer timeout
props.put("consumer.timeout.ms", "120000")
//commit after each 10 second
props.put("auto.commit.interval.ms", "10000")
private val config = new ConsumerConfig(props)
private val connector = Consumer.create(config)
private val filterSpec = new Whitelist(topic)
private val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())(0)
lazy val iterator = streams.iterator()
def read(): Option[String] =
try {
if (hasNext) {
println("Getting message from queue.............")
val message = iterator.next().message()
Some(new String(message))
} else {
None
}
} catch {
case ex: Exception =>
ex.printStackTrace()
None
}
private def hasNext(): Boolean =
try
iterator.hasNext()
catch {
case timeOutEx: ConsumerTimeoutException =>
false
case ex: Exception =>
ex.printStackTrace()
println("Getting error when reading message ")
false
}
def close(): Unit = connector.shutdown()
}
开发者ID:krishnangc,项目名称:kafka-scala-quartz,代码行数:59,代码来源:KafkaConsumer.scala
注:本文中的kafka.serializer.DefaultDecoder类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论