• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala MessageAndMetadata类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中kafka.message.MessageAndMetadata的典型用法代码示例。如果您正苦于以下问题:Scala MessageAndMetadata类的具体用法?Scala MessageAndMetadata怎么用?Scala MessageAndMetadata使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了MessageAndMetadata类的3个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1:

//设置package包名称以及导入依赖的类
package teleporter.integration

import javax.sql.DataSource

import kafka.message.MessageAndMetadata
import org.apache.kafka.clients.producer.ProducerRecord
import org.elasticsearch.action.ActionRequest
import org.mongodb.scala.Document
import teleporter.integration.component.hbase.{HbaseAction, HbaseOut}
import teleporter.integration.component.jdbc.Action
import teleporter.integration.component.kudu.KuduAction


package object component {
  type KafkaMessage = MessageAndMetadata[Array[Byte], Array[Byte]]
  type KafkaRecord = ProducerRecord[Array[Byte], Array[Byte]]
  type JdbcMessage = Map[String, Any]
  type JdbcRecord = Seq[Action]
  type JdbcFunction = DataSource ? Unit
  type ElasticRecord = ActionRequest[_ <: AnyRef]
  type MongoMessage = Document
  type KuduRecord = KuduAction
  type HbaseRecord = HbaseAction
  type HbaseResult = HbaseOut
} 
开发者ID:huanwuji,项目名称:teleporter,代码行数:26,代码来源:package.scala


示例2: FbsKafka

//设置package包名称以及导入依赖的类
package teleporter.integration.protocol.fbs

import java.nio.ByteBuffer

import com.google.flatbuffers.FlatBufferBuilder
import kafka.message.MessageAndMetadata
import org.apache.kafka.clients.producer.ProducerRecord
import teleporter.integration.core.{AckMessage, TId, TransferMessage}
import teleporter.integration.protocol.fbs.generate.{FbsKafkaRecord, FbsKafkaRecords, JdbcMessages}

import scala.collection.immutable.Seq


object FbsKafka {
  def apply(record: generate.FbsKafkaRecord): TransferMessage[ProducerRecord[Array[Byte], Array[Byte]]] = {
    val tId = TId.keyFromBytes(Array.tabulate(record.tIdLength())(record.tId))
    val key = Array.tabulate(record.keyLength())(record.key)
    val data = Array.tabulate(record.dataLength())(record.data)
    val kafkaRecord = new ProducerRecord[Array[Byte], Array[Byte]](record.topic(), record.partition(), key, data)
    TransferMessage[ProducerRecord[Array[Byte], Array[Byte]]](id = tId, data = kafkaRecord)
  }

  def apply(byteBuffer: ByteBuffer): Seq[TransferMessage[ProducerRecord[Array[Byte], Array[Byte]]]] = {
    val records = FbsKafkaRecords.getRootAsFbsKafkaRecords(byteBuffer)
    scala.collection.immutable.Seq.tabulate(records.recordsLength())(records.records).map(apply)
  }

  def apply(bytes: Array[Byte]): Seq[TransferMessage[ProducerRecord[Array[Byte], Array[Byte]]]] = {
    val records = FbsKafkaRecords.getRootAsFbsKafkaRecords(ByteBuffer.wrap(bytes))
    scala.collection.immutable.Seq.tabulate(records.recordsLength())(records.records).map(apply)
  }

  def unapply(message: AckMessage[_, MessageAndMetadata[Array[Byte], Array[Byte]]], builder: FlatBufferBuilder): Int = {
    val kafkaData = message.data
    val tId = FbsKafkaRecord.createTIdVector(builder, message.id.toBytes)
    val topic = builder.createString(kafkaData.topic)
    val key = if (kafkaData.key() == null) 0 else FbsKafkaRecord.createKeyVector(builder, kafkaData.key())
    val data = FbsKafkaRecord.createDataVector(builder, kafkaData.message())
    FbsKafkaRecord.createFbsKafkaRecord(builder, tId, topic, key, kafkaData.partition, data)
  }

  def unapply(messages: Seq[AckMessage[_, MessageAndMetadata[Array[Byte], Array[Byte]]]], initialCapacity: Int): FlatBufferBuilder = {
    val builder = new FlatBufferBuilder(initialCapacity)
    val records = JdbcMessages.createMessagesVector(builder, messages.map(unapply(_, builder)).toArray)
    val root = JdbcMessages.createJdbcMessages(builder, records)
    builder.finish(root)
    builder
  }
} 
开发者ID:huanwuji,项目名称:teleporter,代码行数:50,代码来源:FbsKafka.scala


示例3: KafkaSource

//设置package包名称以及导入依赖的类
package com.ippontech.kafka

import com.ippontech.kafka.stores.OffsetsStore
import com.typesafe.scalalogging.slf4j.LazyLogging
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils

import scala.reflect.ClassTag

object KafkaSource extends LazyLogging {

  def kafkaStream[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag]
  (ssc: StreamingContext, kafkaParams: Map[String, String], offsetsStore: OffsetsStore, topic: String): InputDStream[(K, V)] = {

    val topics = Set(topic)

    val storedOffsets = offsetsStore.readOffsets(topic)
    val kafkaStream = storedOffsets match {
      case None =>
        // start from the latest offsets
        KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaParams, topics)
      case Some(fromOffsets) =>
        // start from previously saved offsets
        val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
        KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](ssc, kafkaParams, fromOffsets, messageHandler)
    }

    // save the offsets
    kafkaStream.foreachRDD(rdd => offsetsStore.saveOffsets(topic, rdd))

    kafkaStream
  }

  // Kafka input stream
  def kafkaStream[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag]
  (ssc: StreamingContext, brokers: String, offsetsStore: OffsetsStore, topic: String): InputDStream[(K, V)] =
    kafkaStream(ssc, Map("metadata.broker.list" -> brokers), offsetsStore, topic)

} 
开发者ID:ippontech,项目名称:spark-kafka-source,代码行数:43,代码来源:KafkaSource.scala



注:本文中的kafka.message.MessageAndMetadata类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala BinaryClassificationEvaluator类代码示例发布时间:2022-05-23
下一篇:
Scala MulticlassClassificationEvaluator类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap