本文整理汇总了Scala中kafka.serializer.Decoder类的典型用法代码示例。如果您正苦于以下问题:Scala Decoder类的具体用法?Scala Decoder怎么用?Scala Decoder使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Decoder类的5个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: AvroFlumeEventDecoder
//设置package包名称以及导入依赖的类
package contrib.kafka.serializer
import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties
import org.apache.avro.io.BinaryDecoder
import org.apache.avro.io.DecoderFactory
import org.apache.avro.specific.SpecificDatumReader
import org.apache.flume.Event
import org.apache.flume.event.EventBuilder
import org.apache.flume.source.avro.AvroFlumeEvent
import java.io.ByteArrayInputStream
class AvroFlumeEventDecoder(props: VerifiableProperties = null)
extends Decoder[Event] {
private val reader: SpecificDatumReader[AvroFlumeEvent] =
new SpecificDatumReader[AvroFlumeEvent](classOf[AvroFlumeEvent])
private var decoder: BinaryDecoder = null.asInstanceOf[BinaryDecoder]
override def fromBytes(bytes: Array[Byte]): Event = {
val inputStream = new ByteArrayInputStream(bytes)
decoder = DecoderFactory.get.directBinaryDecoder(inputStream, decoder)
val avroEvent: AvroFlumeEvent = reader.read(null, decoder)
EventBuilder.withBody(
avroEvent.getBody.array,
toStringJavaMap(avroEvent.getHeaders))
}
def toStringJavaMap(
charSeqMap: JMap[CharSequence, CharSequence]): JMap[String, String] = {
import scala.collection.JavaConversions._
for ((k: CharSequence, v: CharSequence) <- charSeqMap)
yield (k.toString, v.toString)
}
}
开发者ID:saikocat,项目名称:spark-sql-kafka-avroflumeevent,代码行数:40,代码来源:AvroFlumeEventDecoder.scala
示例2: DefaultDecoder
//设置package包名称以及导入依赖的类
package me.jie.ksrdd
import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties
class DefaultDecoder(props: VerifiableProperties = null) extends Decoder[Array[Byte]] with Serializable{
def fromBytes(bytes: Array[Byte]): Array[Byte] = bytes
}
class StringDecoder(props: VerifiableProperties = null) extends Decoder[String] with Serializable {
val encoding =
if(props == null)
"UTF8"
else
props.getString("serializer.encoding", "UTF8")
def fromBytes(bytes: Array[Byte]): String = {
new String(bytes, encoding)
}
}
开发者ID:JensenFeng,项目名称:KSRdd,代码行数:23,代码来源:Decoder.scala
示例3: StreamConsumer
//设置package包名称以及导入依赖的类
package example.consumer
import kafka.consumer.{Consumer => KafkaConsumer, ConsumerIterator, Whitelist}
import kafka.serializer.{DefaultDecoder, Decoder}
import scala.collection.JavaConversions._
case class StreamConsumer(topics: List[String]) extends Consumer(topics) {
//topics to listen
private val filterSpec = new Whitelist(topics.mkString(","))
protected val keyDecoder: Decoder[Array[Byte]] = new DefaultDecoder()
protected val valueDecoder: Decoder[Array[Byte]] = new DefaultDecoder()
private lazy val consumer = KafkaConsumer.create(config)
private lazy val stream = consumer.createMessageStreamsByFilter(filterSpec, 1, keyDecoder, valueDecoder).get(0)
def read(): Stream[String] = Stream.cons(new String(stream.head.message()), read())
}
object StreamConsumer {
def apply(topics: List[String], kDecoder: Decoder[Array[Byte]], vDecoder: Decoder[Array[Byte]]) = new StreamConsumer(topics) {
override val keyDecoder = kDecoder
override val valueDecoder = vDecoder
}
}
case class SingleTopicConsumer(topic: String) extends Consumer(List(topic)) {
private lazy val consumer = KafkaConsumer.create(config)
val threadNum = 1
private lazy val consumerMap = consumer.createMessageStreams(Map(topic -> threadNum))
private lazy val stream = consumerMap.getOrElse(topic, List()).head
override def read(): Stream[String] = Stream.cons(new String(stream.head.message()), read())
}
开发者ID:alonsoir,项目名称:hello-kafka-twitter-scala,代码行数:36,代码来源:StreamConsumer.scala
示例4: props
//设置package包名称以及导入依赖的类
package com.kakao.cuesheet.convert
import java.util.Arrays.copyOfRange
import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
sealed trait AvroDecoder[T] extends Decoder[T] {
def props: VerifiableProperties
protected val schema = new Schema.Parser().parse(props.getString(Avro.SCHEMA))
protected val skipBytes = props.getInt(Avro.SKIP_BYTES, 0)
protected val reader = new GenericDatumReader[GenericRecord](schema)
protected val decoder = Avro.recordDecoder(reader)
private def skip(bytes: Array[Byte], size: Int): Array[Byte] = {
val length = bytes.length
length - size match {
case remaining if remaining > 0 => copyOfRange(bytes, size, length)
case _ => new Array[Byte](0)
}
}
def parse(bytes: Array[Byte]): GenericRecord = {
val data = if (skipBytes == 0) bytes else skip(bytes, skipBytes)
decoder(data)
}
}
class AvroRecordDecoder(val props: VerifiableProperties) extends AvroDecoder[GenericRecord] {
override def fromBytes(bytes: Array[Byte]): GenericRecord = parse(bytes)
}
class AvroMapDecoder(val props: VerifiableProperties) extends AvroDecoder[Map[String, Any]] {
override def fromBytes(bytes: Array[Byte]): Map[String, Any] = Avro.toMap(parse(bytes))
}
class AvroJsonDecoder(val props: VerifiableProperties) extends AvroDecoder[String] {
override def fromBytes(bytes: Array[Byte]): String = Avro.toJson(parse(bytes))
}
开发者ID:kakao,项目名称:cuesheet,代码行数:46,代码来源:AvroDecoder.scala
示例5: KafkaSource
//设置package包名称以及导入依赖的类
package com.ippontech.kafka
import com.ippontech.kafka.stores.OffsetsStore
import com.typesafe.scalalogging.slf4j.LazyLogging
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import scala.reflect.ClassTag
object KafkaSource extends LazyLogging {
def kafkaStream[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag]
(ssc: StreamingContext, kafkaParams: Map[String, String], offsetsStore: OffsetsStore, topic: String): InputDStream[(K, V)] = {
val topics = Set(topic)
val storedOffsets = offsetsStore.readOffsets(topic)
val kafkaStream = storedOffsets match {
case None =>
// start from the latest offsets
KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaParams, topics)
case Some(fromOffsets) =>
// start from previously saved offsets
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](ssc, kafkaParams, fromOffsets, messageHandler)
}
// save the offsets
kafkaStream.foreachRDD(rdd => offsetsStore.saveOffsets(topic, rdd))
kafkaStream
}
// Kafka input stream
def kafkaStream[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag]
(ssc: StreamingContext, brokers: String, offsetsStore: OffsetsStore, topic: String): InputDStream[(K, V)] =
kafkaStream(ssc, Map("metadata.broker.list" -> brokers), offsetsStore, topic)
}
开发者ID:ippontech,项目名称:spark-kafka-source,代码行数:43,代码来源:KafkaSource.scala
注:本文中的kafka.serializer.Decoder类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论