• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala DefaultDecoder类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中kafka.serializer.DefaultDecoder的典型用法代码示例。如果您正苦于以下问题:Scala DefaultDecoder类的具体用法?Scala DefaultDecoder怎么用?Scala DefaultDecoder使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了DefaultDecoder类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: StreamingApp

//设置package包名称以及导入依赖的类
package spark.test

import data.processing.avro.AvroDecoder
import kafka.serializer.StringDecoder
import kafka.serializer.DefaultDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils


object StreamingApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Simple Streaming Application")
    val ssc = new StreamingContext(conf, Seconds(1))

    val topicsSet = "test".split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")

    val directKafkaStream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
      ssc, kafkaParams, topicsSet
    )



    directKafkaStream.foreachRDD(rdd =>
      rdd.foreachPartition(partitionOfRecords => {
        val avroDecoder = new AvroDecoder("/event-record.json")
        partitionOfRecords.map(m => (m._1, avroDecoder.decode(m._2))).foreach(m => println(m))
    }))


    ssc.start()
    ssc.awaitTermination()
  }
} 
开发者ID:ipogudin,项目名称:data-processing-examples,代码行数:36,代码来源:StreamingApp.scala


示例2: KafkaPayload

//设置package包名称以及导入依赖的类
package tools

import kafka.serializer.DefaultDecoder
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils

case class KafkaPayload(value: Array[Byte])

class KafkaDStreamSource(config: Map[String, String]) {

  def createSource(ssc: StreamingContext, topic: String): DStream[KafkaPayload] = {
    val kafkaParams = config
    val kafkaTopics = Set(topic)

    KafkaUtils.
      createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
      ssc,
      kafkaParams,
      kafkaTopics).
      map(dStream => KafkaPayload(dStream._2))
  }

}

object KafkaDStreamSource {
  def apply(config: Map[String, String]): KafkaDStreamSource = new KafkaDStreamSource(config)
} 
开发者ID:Antwnis,项目名称:kafka-streaming-examples,代码行数:29,代码来源:KafkaDStreamSource.scala


示例3: KafkaConsumer

//设置package包名称以及导入依赖的类
package Services



import java.util.Properties

import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException, Whitelist}
import kafka.serializer.DefaultDecoder


class KafkaConsumer(topic: String, groupId: String, zookeeperConnect: String) {


  private val props = new Properties()

  props.put("group.id", groupId)
  props.put("zookeeper.connect", zookeeperConnect)
  props.put("auto.offset.reset", "smallest")
  props.put("consumer.timeout.ms", "500")
  props.put("auto.commit.interval.ms", "500")

  private val config = new ConsumerConfig(props)
  private val connector = Consumer.create(config)
  private val filterSpec = new Whitelist(topic)
  private val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())(0)

  lazy val iterator = streams.iterator()

  def read() =
    try {
      if (hasNext) {
        println("Getting message from queue.............")
        val message = iterator.next().message()
        Some(new String(message))
      } else {
        None
      }
    } catch {
      case ex: Throwable =>
        ex.printStackTrace()
        None
    }

  private def hasNext(): Boolean =
    try
      iterator.hasNext()
    catch {
      case timeOutEx: ConsumerTimeoutException =>
        false
      case ex: Throwable =>
        println("Getting error when reading message ")
        false
    }

  def close(): Unit = connector.shutdown()

} 
开发者ID:pranjut,项目名称:consul-kafka-microservices,代码行数:58,代码来源:KafkaConsumer.scala


示例4: KafkaConsumer

//设置package包名称以及导入依赖的类
package com.knoldus.kafka.consumer

import java.util.Properties

import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException, Whitelist}
import kafka.serializer.DefaultDecoder


class KafkaConsumer(topic: String, groupId: String, zookeeperConnect: String) {



  private val props = new Properties()

  props.put("group.id", groupId)
  props.put("zookeeper.connect", zookeeperConnect)
  props.put("auto.offset.reset", "smallest")
  //2 minute consumer timeout
  props.put("consumer.timeout.ms", "120000")
  //commit after each 10 second
  props.put("auto.commit.interval.ms", "10000")
  private val config = new ConsumerConfig(props)
  private val connector = Consumer.create(config)
  private val filterSpec = new Whitelist(topic)
  private val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())(0)

  lazy val iterator = streams.iterator()

  def read(): Option[String] =
    try {
      if (hasNext) {
        println("Getting message from queue.............")
        val message = iterator.next().message()
        Some(new String(message))
      } else {
        None
      }
    } catch {
      case ex: Exception =>
        ex.printStackTrace()
        None
    }

  private def hasNext(): Boolean =
    try
      iterator.hasNext()
    catch {
      case timeOutEx: ConsumerTimeoutException =>
        false
      case ex: Exception =>
        ex.printStackTrace()
        println("Getting error when reading message ")
        false
    }

  def close(): Unit = connector.shutdown()

} 
开发者ID:knoldus,项目名称:activator-kafka-scala-producer-consumer.g8,代码行数:59,代码来源:KafkaConsumer.scala


示例5: createStream

//设置package包名称以及导入依赖的类
package it.agilelab.bigdata.wasp.consumers.readers

import it.agilelab.bigdata.wasp.core.WaspSystem
import it.agilelab.bigdata.wasp.core.WaspSystem._
import it.agilelab.bigdata.wasp.core.kafka.CheckOrCreateTopic
import it.agilelab.bigdata.wasp.core.logging.WaspLogger
import it.agilelab.bigdata.wasp.core.models.{DefaultConfiguration, TopicModel}
import it.agilelab.bigdata.wasp.core.utils.{AvroToJsonUtil, ConfigManager, JsonToByteArrayUtil}
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils



  //TODO: check warning (not understood)
  def createStream(group: String, topic: TopicModel)(implicit ssc: StreamingContext): DStream[String] = {
    val kafkaConfig = ConfigManager.getKafkaConfig

    val kafkaConfigMap: Map[String, String] = Map(
      "zookeeper.connect" -> kafkaConfig.zookeeper.toString,
      "zookeeper.connection.timeout.ms" -> kafkaConfig.zookeeper.timeout.getOrElse(DefaultConfiguration.timeout).toString
    )


    if (??[Boolean](WaspSystem.getKafkaAdminActor, CheckOrCreateTopic(topic.name, topic.partitions, topic.replicas))) {
      val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](
        ssc,
        kafkaConfigMap + ("group.id" -> group),
        Map(topic.name -> 3),
        StorageLevel.MEMORY_AND_DISK_2
      )

      topic.topicDataType match {
        case "avro" => receiver.map(x => (x._1, AvroToJsonUtil.avroToJson(x._2))).map(_._2)
        case "json" => receiver.map(x => (x._1, JsonToByteArrayUtil.byteArrayToJson(x._2))).map(_._2)
        case _ => receiver.map(x => (x._1, AvroToJsonUtil.avroToJson(x._2))).map(_._2)
      }

    } else {
      logger.error(s"Topic not found on Kafka: $topic")
      throw new Exception(s"Topic not found on Kafka: $topic")
    }
  }
} 
开发者ID:agile-lab-dev,项目名称:wasp,代码行数:47,代码来源:KafkaReader.scala


示例6: StreamConsumer

//设置package包名称以及导入依赖的类
package example.consumer

import kafka.consumer.{Consumer => KafkaConsumer, ConsumerIterator, Whitelist}
import kafka.serializer.{DefaultDecoder, Decoder}
import scala.collection.JavaConversions._

case class StreamConsumer(topics: List[String]) extends Consumer(topics) {
  //topics to listen
  private val filterSpec = new Whitelist(topics.mkString(","))

  protected val keyDecoder: Decoder[Array[Byte]] = new DefaultDecoder()
  protected val valueDecoder: Decoder[Array[Byte]] = new DefaultDecoder()

  private lazy val consumer = KafkaConsumer.create(config)
  private lazy val stream = consumer.createMessageStreamsByFilter(filterSpec, 1, keyDecoder, valueDecoder).get(0)

  def read(): Stream[String] = Stream.cons(new String(stream.head.message()), read())
}

object StreamConsumer {
  def apply(topics: List[String], kDecoder: Decoder[Array[Byte]], vDecoder: Decoder[Array[Byte]]) = new StreamConsumer(topics) {
    override val keyDecoder = kDecoder
    override val valueDecoder = vDecoder
  }
}

case class SingleTopicConsumer(topic: String) extends Consumer(List(topic)) {
  private lazy val consumer = KafkaConsumer.create(config)
  val threadNum = 1

  private lazy val consumerMap = consumer.createMessageStreams(Map(topic -> threadNum))
  private lazy val stream = consumerMap.getOrElse(topic, List()).head

  override def read(): Stream[String] = Stream.cons(new String(stream.head.message()), read())
} 
开发者ID:alonsoir,项目名称:hello-kafka-twitter-scala,代码行数:36,代码来源:StreamConsumer.scala


示例7: KafkaConsumer

//设置package包名称以及导入依赖的类
package services.kafka.consumer

import java.util.Properties

import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException, Whitelist}
import kafka.serializer.DefaultDecoder
import play.api.Logger


class KafkaConsumer(topic: String, groupId: String, zookeeperConnect: String) {



  private val props = new Properties()

  props.put("group.id", groupId)
  props.put("zookeeper.connect", zookeeperConnect)
  props.put("auto.offset.reset", "smallest")
  //2 minute consumer timeout
  props.put("consumer.timeout.ms", "120000")
  //commit after each 10 second
  props.put("auto.commit.interval.ms", "10000")
  private val config = new ConsumerConfig(props)
  private val connector = Consumer.create(config)
  private val filterSpec = new Whitelist(topic)
  private val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())(0)

  lazy val iterator = streams.iterator()

  def read(): Option[String] =
    try {
      if (hasNext) {
        Logger.info("Getting message from queue.............")
        val message = iterator.next().message()
        Some(new String(message))
      } else {
        None
      }
    } catch {
      case ex: Exception =>
        ex.printStackTrace()
        None
    }

  private def hasNext(): Boolean =
    try
      iterator.hasNext()
    catch {
      case timeOutEx: ConsumerTimeoutException =>
        false
      case ex: Exception =>
        Logger.error("Getting error when reading message ",ex)
        false
    }

  def close(): Unit = connector.shutdown()

} 
开发者ID:satendrakumar,项目名称:play-kafka-example,代码行数:59,代码来源:KafkaConsumer.scala


示例8: KafkaConsumer

//设置package包名称以及导入依赖的类
package controllers

import java.util.Properties

import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException, Whitelist}
import kafka.serializer.DefaultDecoder


class KafkaConsumer(topic: String, groupId: String, zookeeperConnect: String) {



  private val props = new Properties()

  props.put("group.id", groupId)
  props.put("zookeeper.connect", zookeeperConnect)
  props.put("auto.offset.reset", "smallest")
  //2 minute consumer timeout
  props.put("consumer.timeout.ms", "120000")
  //commit after each 10 second
  props.put("auto.commit.interval.ms", "10000")
  private val config = new ConsumerConfig(props)
  private val connector = Consumer.create(config)
  private val filterSpec = new Whitelist(topic)
  private val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())(0)

  lazy val iterator = streams.iterator()

  def read(): Option[String] =
    try {
      if (hasNext) {
        println("Getting message from queue.............")
        val message = iterator.next().message()
        Some(new String(message))
      } else {
        None
      }
    } catch {
      case ex: Exception =>
        ex.printStackTrace()
        None
    }

  private def hasNext(): Boolean =
    try
      iterator.hasNext()
    catch {
      case timeOutEx: ConsumerTimeoutException =>
        false
      case ex: Exception =>
        ex.printStackTrace()
        println("Getting error when reading message ")
        false
    }

  def close(): Unit = connector.shutdown()

} 
开发者ID:krishnangc,项目名称:kafka-scala-quartz,代码行数:59,代码来源:KafkaConsumer.scala



注:本文中的kafka.serializer.DefaultDecoder类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Hashing类代码示例发布时间:2022-05-23
下一篇:
Scala ZipOutputStream类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap