• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala VerifiableProperties类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中kafka.utils.VerifiableProperties的典型用法代码示例。如果您正苦于以下问题:Scala VerifiableProperties类的具体用法?Scala VerifiableProperties怎么用?Scala VerifiableProperties使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了VerifiableProperties类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: SimpleConsumer

//设置package包名称以及导入依赖的类
package com.landoop.kafka.ws.core.operations

import java.util.Properties

import com.landoop.kafka.ws.KafkaConstants
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import kafka.utils.VerifiableProperties
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.serialization.StringDeserializer

object SimpleConsumer {

  // Get a simple string consumer
  def getConsumer[T](kafkaBrokers: String, maxRecords: Option[Int] = None): KafkaConsumer[T, T] = {
    val props = new Properties()
    props.put(KafkaConstants.BOOTSTRAP_SERVER, kafkaBrokers)
    props.put(KafkaConstants.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")
    props.put(KafkaConstants.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")
    maxRecords.find(_ > 0).map { max =>
      props.put(KafkaConstants.MAX_POLL_RECORDS, max.toString)
    }
    val vProps = new VerifiableProperties(props)

    val consumer = new KafkaConsumer[T, T](props)
    consumer
  }

  def createNewConsumerWithConsumerGroup(kafkaBrokers: String, group: String): KafkaConsumer[String, String] = {
    assert(group.length > 1, "Invalid group length")
    val properties = new Properties()
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers)
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, group)
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
    properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, (new StringDeserializer).getClass.getName)
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, (new StringDeserializer).getClass.getName)
    new KafkaConsumer(properties)
  }


  def getStringAvroConsumer[T](kafkaBrokers: String, schemaRegistry: String, maxRecords: Option[Int] = None): KafkaConsumer[String, T] = {
    val props = new Properties()
    props.put(KafkaConstants.BOOTSTRAP_SERVER, kafkaBrokers)
    props.put(KafkaConstants.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")
    props.put(KafkaConstants.VALUE_DESERIALIZER, classOf[KafkaAvroDeserializer].getCanonicalName)
    props.put(KafkaConstants.SCHEMA_REGISTRY_URL, schemaRegistry)

    maxRecords.find(_ > 0).map { max =>
      props.put(KafkaConstants.MAX_POLL_RECORDS, max.toString)
    }
    val vProps = new VerifiableProperties(props)

    val consumer = new KafkaConsumer[String, T](props)
    consumer
  }

} 
开发者ID:Landoop,项目名称:kafka-ws,代码行数:58,代码来源:SimpleConsumer.scala


示例2: AvroFlumeEventEncoder

//设置package包名称以及导入依赖的类
package contrib.kafka.serializer

import kafka.serializer.Encoder
import kafka.utils.VerifiableProperties

import org.apache.avro.io.BinaryEncoder
import org.apache.avro.io.EncoderFactory
import org.apache.avro.specific.SpecificDatumWriter

import org.apache.flume.source.avro.AvroFlumeEvent

import java.io.ByteArrayOutputStream
import java.io.InputStream

class AvroFlumeEventEncoder(props: VerifiableProperties = null)
  extends Encoder[AvroFlumeEvent] {

  private val writer: SpecificDatumWriter[AvroFlumeEvent] =
    new SpecificDatumWriter[AvroFlumeEvent](classOf[AvroFlumeEvent])
  private var encoder: BinaryEncoder = null.asInstanceOf[BinaryEncoder]
  private var tempOutStream = new ByteArrayOutputStream()

  override def toBytes(event: AvroFlumeEvent): Array[Byte] = {
    tempOutStream.reset()
    encoder = EncoderFactory.get.directBinaryEncoder(tempOutStream, encoder)
    writer.write(event, encoder)
    tempOutStream.toByteArray
  }
} 
开发者ID:saikocat,项目名称:spark-sql-kafka-avroflumeevent,代码行数:30,代码来源:AvroFlumeEventEncoder.scala


示例3: AvroFlumeEventDecoder

//设置package包名称以及导入依赖的类
package contrib.kafka.serializer

import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties

import org.apache.avro.io.BinaryDecoder
import org.apache.avro.io.DecoderFactory
import org.apache.avro.specific.SpecificDatumReader

import org.apache.flume.Event
import org.apache.flume.event.EventBuilder
import org.apache.flume.source.avro.AvroFlumeEvent

import java.io.ByteArrayInputStream

class AvroFlumeEventDecoder(props: VerifiableProperties = null)
  extends Decoder[Event] {

  private val reader: SpecificDatumReader[AvroFlumeEvent] =
    new SpecificDatumReader[AvroFlumeEvent](classOf[AvroFlumeEvent])
  private var decoder: BinaryDecoder = null.asInstanceOf[BinaryDecoder]

  override def fromBytes(bytes: Array[Byte]): Event = {
    val inputStream = new ByteArrayInputStream(bytes)
    decoder = DecoderFactory.get.directBinaryDecoder(inputStream, decoder)
    val avroEvent: AvroFlumeEvent = reader.read(null, decoder)
    EventBuilder.withBody(
      avroEvent.getBody.array,
      toStringJavaMap(avroEvent.getHeaders))
  }

  def toStringJavaMap(
    charSeqMap: JMap[CharSequence, CharSequence]): JMap[String, String] = {

    import scala.collection.JavaConversions._
    for ((k: CharSequence, v: CharSequence) <- charSeqMap)
      yield (k.toString, v.toString)
  }
} 
开发者ID:saikocat,项目名称:spark-sql-kafka-avroflumeevent,代码行数:40,代码来源:AvroFlumeEventDecoder.scala


示例4: DefaultDecoder

//设置package包名称以及导入依赖的类
package me.jie.ksrdd

import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties



class DefaultDecoder(props: VerifiableProperties = null) extends Decoder[Array[Byte]] with Serializable{
    def fromBytes(bytes: Array[Byte]): Array[Byte] = bytes
}

class StringDecoder(props: VerifiableProperties = null) extends Decoder[String] with Serializable {
  val encoding =
    if(props == null)
      "UTF8"
    else
      props.getString("serializer.encoding", "UTF8")

  def fromBytes(bytes: Array[Byte]): String = {
    new String(bytes, encoding)
  }
} 
开发者ID:JensenFeng,项目名称:KSRdd,代码行数:23,代码来源:Decoder.scala


示例5: props

//设置package包名称以及导入依赖的类
package com.kakao.cuesheet.convert

import java.util.Arrays.copyOfRange

import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}


sealed trait AvroDecoder[T] extends Decoder[T] {

  def props: VerifiableProperties

  protected val schema = new Schema.Parser().parse(props.getString(Avro.SCHEMA))
  protected val skipBytes = props.getInt(Avro.SKIP_BYTES, 0)

  protected val reader = new GenericDatumReader[GenericRecord](schema)
  protected val decoder = Avro.recordDecoder(reader)

  private def skip(bytes: Array[Byte], size: Int): Array[Byte] = {
    val length = bytes.length
    length - size match {
      case remaining if remaining > 0 => copyOfRange(bytes, size, length)
      case _ => new Array[Byte](0)
    }
  }

  def parse(bytes: Array[Byte]): GenericRecord = {
    val data = if (skipBytes == 0) bytes else skip(bytes, skipBytes)
    decoder(data)
  }
}

class AvroRecordDecoder(val props: VerifiableProperties) extends AvroDecoder[GenericRecord] {
  override def fromBytes(bytes: Array[Byte]): GenericRecord = parse(bytes)
}

class AvroMapDecoder(val props: VerifiableProperties) extends AvroDecoder[Map[String, Any]] {
  override def fromBytes(bytes: Array[Byte]): Map[String, Any] = Avro.toMap(parse(bytes))
}

class AvroJsonDecoder(val props: VerifiableProperties) extends AvroDecoder[String] {
  override def fromBytes(bytes: Array[Byte]): String = Avro.toJson(parse(bytes))
} 
开发者ID:kakao,项目名称:cuesheet,代码行数:46,代码来源:AvroDecoder.scala


示例6: SimplePartitioner

//设置package包名称以及导入依赖的类
package packt.ch05

import java.util

import kafka.utils.VerifiableProperties
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Partitioner
import org.apache.kafka.common.Cluster

object SimplePartitioner {

  private var producer: KafkaProducer[String, String] = _
}

class SimplePartitioner(props: VerifiableProperties) extends Partitioner {

  def partition(key: AnyRef, a_numPartitions: Int): Int = {
    var partition = 0
    val partitionKey = key.asInstanceOf[String]
    val offset = partitionKey.lastIndexOf('.')
    if (offset > 0) {
      partition = java.lang.Integer.parseInt(partitionKey.substring(offset + 1)) %
        a_numPartitions
    }
    partition
  }

  override def partition(topic: String,
                         key: AnyRef,
                         keyBytes: Array[Byte],
                         value: AnyRef,
                         valueBytes: Array[Byte],
                         cluster: Cluster): Int = partition(key, 10)

  override def close() {
  }

  override def configure(configs: util.Map[String, _]) {
  }
} 
开发者ID:PacktPublishing,项目名称:Fast-Data-Processing-Systems-with-SMACK-Stack,代码行数:41,代码来源:SimplePartitioner.scala


示例7: toString

//设置package包名称以及导入依赖的类
package org.kongo.kafka.metrics.config

import java.util.regex.Pattern

import kafka.utils.VerifiableProperties
import org.kongo.kafka.metrics.Dimension
import org.kongo.kafka.metrics.RegexMetricPredicate

import scala.util.Try


  val pollingIntervalSecs: Int = behavior.getInt("kafka.metrics.polling.interval.secs", 10)

  override def toString: String = {
    val dims = dimensions.map(_.name).mkString(",")
    s"[host=$host, port=$port, prefix=$prefix, enabled=$enabled, dimensions=($dims), polling-interval=$pollingIntervalSecs]"
  }

  private def pattern(key: String): Option[Pattern] = {
    val propsKey = s"${ ConfigBase }.${ key }"
    if (behavior.contains(propsKey))
      Try(Pattern.compile(behavior.getString(propsKey, null))).toOption
    else
      None
  }
}

object KafkaStatsdReporterConfig {
  val ConfigBase = "external.kafka.statsd.metrics"

  def apply(props: VerifiableProperties): KafkaStatsdReporterConfig =
    new KafkaStatsdReporterConfig(new VerifiableConfigBehavior(props))

  def apply(map: java.util.Map[String, _]): KafkaStatsdReporterConfig =
    new KafkaStatsdReporterConfig(new PropertiesMapBehavior(map))
} 
开发者ID:kongo2002,项目名称:kafka-statsd-reporter,代码行数:37,代码来源:KafkaStatsdReporterConfig.scala


示例8: TestUtils

//设置package包名称以及导入依赖的类
package org.kongo.kafka.metrics

import java.util.Collections
import java.util.Properties

import kafka.utils.VerifiableProperties
import org.apache.kafka.common.{MetricName => KMetricName}
import org.apache.kafka.common.Metric
import org.kongo.kafka.metrics.config.KafkaStatsdReporterConfig
import org.kongo.kafka.metrics.config.PropertiesMapBehavior
import org.kongo.kafka.metrics.config.VerifiableConfigBehavior

object TestUtils {

  def dummyKafkaMetric: Metric = {
    new Metric {
      override def metricName(): KMetricName = new KMetricName("name", "group", "description", Collections.emptyMap())
      override def value(): Double = 0d
    }
  }

  def singletonVerifiablePropertiesBehavior(key: String, value: AnyRef): VerifiableConfigBehavior =
    new VerifiableConfigBehavior(singletonVerifiableProperties(key, value))

  def singletonVerifiableProperties(key: String, value: AnyRef): VerifiableProperties = {
    val props = new Properties
    props.put(key, value)
    new VerifiableProperties(props)
  }

  def emptyVerfiableConfig: KafkaStatsdReporterConfig =
    KafkaStatsdReporterConfig(new VerifiableProperties())

  def emptyVerifiableConfigBehavior: VerifiableConfigBehavior = {
    val props = new VerifiableProperties()
    new VerifiableConfigBehavior(props)
  }

  def emptyMapConfig: KafkaStatsdReporterConfig =
    KafkaStatsdReporterConfig(Collections.emptyMap[String, String]())

  def emptyMapConfigBehavior: PropertiesMapBehavior = {
    new PropertiesMapBehavior(Collections.emptyMap())
  }
} 
开发者ID:kongo2002,项目名称:kafka-statsd-reporter,代码行数:46,代码来源:TestUtils.scala



注:本文中的kafka.utils.VerifiableProperties类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala ReadableByteChannel类代码示例发布时间:2022-05-23
下一篇:
Scala Signal类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap