本文整理汇总了Scala中kafka.message.MessageAndMetadata类的典型用法代码示例。如果您正苦于以下问题:Scala MessageAndMetadata类的具体用法?Scala MessageAndMetadata怎么用?Scala MessageAndMetadata使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了MessageAndMetadata类的3个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1:
//设置package包名称以及导入依赖的类
package teleporter.integration
import javax.sql.DataSource
import kafka.message.MessageAndMetadata
import org.apache.kafka.clients.producer.ProducerRecord
import org.elasticsearch.action.ActionRequest
import org.mongodb.scala.Document
import teleporter.integration.component.hbase.{HbaseAction, HbaseOut}
import teleporter.integration.component.jdbc.Action
import teleporter.integration.component.kudu.KuduAction
package object component {
type KafkaMessage = MessageAndMetadata[Array[Byte], Array[Byte]]
type KafkaRecord = ProducerRecord[Array[Byte], Array[Byte]]
type JdbcMessage = Map[String, Any]
type JdbcRecord = Seq[Action]
type JdbcFunction = DataSource ? Unit
type ElasticRecord = ActionRequest[_ <: AnyRef]
type MongoMessage = Document
type KuduRecord = KuduAction
type HbaseRecord = HbaseAction
type HbaseResult = HbaseOut
}
开发者ID:huanwuji,项目名称:teleporter,代码行数:26,代码来源:package.scala
示例2: FbsKafka
//设置package包名称以及导入依赖的类
package teleporter.integration.protocol.fbs
import java.nio.ByteBuffer
import com.google.flatbuffers.FlatBufferBuilder
import kafka.message.MessageAndMetadata
import org.apache.kafka.clients.producer.ProducerRecord
import teleporter.integration.core.{AckMessage, TId, TransferMessage}
import teleporter.integration.protocol.fbs.generate.{FbsKafkaRecord, FbsKafkaRecords, JdbcMessages}
import scala.collection.immutable.Seq
object FbsKafka {
def apply(record: generate.FbsKafkaRecord): TransferMessage[ProducerRecord[Array[Byte], Array[Byte]]] = {
val tId = TId.keyFromBytes(Array.tabulate(record.tIdLength())(record.tId))
val key = Array.tabulate(record.keyLength())(record.key)
val data = Array.tabulate(record.dataLength())(record.data)
val kafkaRecord = new ProducerRecord[Array[Byte], Array[Byte]](record.topic(), record.partition(), key, data)
TransferMessage[ProducerRecord[Array[Byte], Array[Byte]]](id = tId, data = kafkaRecord)
}
def apply(byteBuffer: ByteBuffer): Seq[TransferMessage[ProducerRecord[Array[Byte], Array[Byte]]]] = {
val records = FbsKafkaRecords.getRootAsFbsKafkaRecords(byteBuffer)
scala.collection.immutable.Seq.tabulate(records.recordsLength())(records.records).map(apply)
}
def apply(bytes: Array[Byte]): Seq[TransferMessage[ProducerRecord[Array[Byte], Array[Byte]]]] = {
val records = FbsKafkaRecords.getRootAsFbsKafkaRecords(ByteBuffer.wrap(bytes))
scala.collection.immutable.Seq.tabulate(records.recordsLength())(records.records).map(apply)
}
def unapply(message: AckMessage[_, MessageAndMetadata[Array[Byte], Array[Byte]]], builder: FlatBufferBuilder): Int = {
val kafkaData = message.data
val tId = FbsKafkaRecord.createTIdVector(builder, message.id.toBytes)
val topic = builder.createString(kafkaData.topic)
val key = if (kafkaData.key() == null) 0 else FbsKafkaRecord.createKeyVector(builder, kafkaData.key())
val data = FbsKafkaRecord.createDataVector(builder, kafkaData.message())
FbsKafkaRecord.createFbsKafkaRecord(builder, tId, topic, key, kafkaData.partition, data)
}
def unapply(messages: Seq[AckMessage[_, MessageAndMetadata[Array[Byte], Array[Byte]]]], initialCapacity: Int): FlatBufferBuilder = {
val builder = new FlatBufferBuilder(initialCapacity)
val records = JdbcMessages.createMessagesVector(builder, messages.map(unapply(_, builder)).toArray)
val root = JdbcMessages.createJdbcMessages(builder, records)
builder.finish(root)
builder
}
}
开发者ID:huanwuji,项目名称:teleporter,代码行数:50,代码来源:FbsKafka.scala
示例3: KafkaSource
//设置package包名称以及导入依赖的类
package com.ippontech.kafka
import com.ippontech.kafka.stores.OffsetsStore
import com.typesafe.scalalogging.slf4j.LazyLogging
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import scala.reflect.ClassTag
object KafkaSource extends LazyLogging {
def kafkaStream[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag]
(ssc: StreamingContext, kafkaParams: Map[String, String], offsetsStore: OffsetsStore, topic: String): InputDStream[(K, V)] = {
val topics = Set(topic)
val storedOffsets = offsetsStore.readOffsets(topic)
val kafkaStream = storedOffsets match {
case None =>
// start from the latest offsets
KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaParams, topics)
case Some(fromOffsets) =>
// start from previously saved offsets
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](ssc, kafkaParams, fromOffsets, messageHandler)
}
// save the offsets
kafkaStream.foreachRDD(rdd => offsetsStore.saveOffsets(topic, rdd))
kafkaStream
}
// Kafka input stream
def kafkaStream[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag]
(ssc: StreamingContext, brokers: String, offsetsStore: OffsetsStore, topic: String): InputDStream[(K, V)] =
kafkaStream(ssc, Map("metadata.broker.list" -> brokers), offsetsStore, topic)
}
开发者ID:ippontech,项目名称:spark-kafka-source,代码行数:43,代码来源:KafkaSource.scala
注:本文中的kafka.message.MessageAndMetadata类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论