本文整理汇总了Scala中kafka.utils.VerifiableProperties类的典型用法代码示例。如果您正苦于以下问题:Scala VerifiableProperties类的具体用法?Scala VerifiableProperties怎么用?Scala VerifiableProperties使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了VerifiableProperties类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: SimpleConsumer
//设置package包名称以及导入依赖的类
package com.landoop.kafka.ws.core.operations
import java.util.Properties
import com.landoop.kafka.ws.KafkaConstants
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import kafka.utils.VerifiableProperties
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.serialization.StringDeserializer
object SimpleConsumer {
// Get a simple string consumer
def getConsumer[T](kafkaBrokers: String, maxRecords: Option[Int] = None): KafkaConsumer[T, T] = {
val props = new Properties()
props.put(KafkaConstants.BOOTSTRAP_SERVER, kafkaBrokers)
props.put(KafkaConstants.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(KafkaConstants.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")
maxRecords.find(_ > 0).map { max =>
props.put(KafkaConstants.MAX_POLL_RECORDS, max.toString)
}
val vProps = new VerifiableProperties(props)
val consumer = new KafkaConsumer[T, T](props)
consumer
}
def createNewConsumerWithConsumerGroup(kafkaBrokers: String, group: String): KafkaConsumer[String, String] = {
assert(group.length > 1, "Invalid group length")
val properties = new Properties()
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers)
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group)
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, (new StringDeserializer).getClass.getName)
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, (new StringDeserializer).getClass.getName)
new KafkaConsumer(properties)
}
def getStringAvroConsumer[T](kafkaBrokers: String, schemaRegistry: String, maxRecords: Option[Int] = None): KafkaConsumer[String, T] = {
val props = new Properties()
props.put(KafkaConstants.BOOTSTRAP_SERVER, kafkaBrokers)
props.put(KafkaConstants.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(KafkaConstants.VALUE_DESERIALIZER, classOf[KafkaAvroDeserializer].getCanonicalName)
props.put(KafkaConstants.SCHEMA_REGISTRY_URL, schemaRegistry)
maxRecords.find(_ > 0).map { max =>
props.put(KafkaConstants.MAX_POLL_RECORDS, max.toString)
}
val vProps = new VerifiableProperties(props)
val consumer = new KafkaConsumer[String, T](props)
consumer
}
}
开发者ID:Landoop,项目名称:kafka-ws,代码行数:58,代码来源:SimpleConsumer.scala
示例2: AvroFlumeEventEncoder
//设置package包名称以及导入依赖的类
package contrib.kafka.serializer
import kafka.serializer.Encoder
import kafka.utils.VerifiableProperties
import org.apache.avro.io.BinaryEncoder
import org.apache.avro.io.EncoderFactory
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.flume.source.avro.AvroFlumeEvent
import java.io.ByteArrayOutputStream
import java.io.InputStream
class AvroFlumeEventEncoder(props: VerifiableProperties = null)
extends Encoder[AvroFlumeEvent] {
private val writer: SpecificDatumWriter[AvroFlumeEvent] =
new SpecificDatumWriter[AvroFlumeEvent](classOf[AvroFlumeEvent])
private var encoder: BinaryEncoder = null.asInstanceOf[BinaryEncoder]
private var tempOutStream = new ByteArrayOutputStream()
override def toBytes(event: AvroFlumeEvent): Array[Byte] = {
tempOutStream.reset()
encoder = EncoderFactory.get.directBinaryEncoder(tempOutStream, encoder)
writer.write(event, encoder)
tempOutStream.toByteArray
}
}
开发者ID:saikocat,项目名称:spark-sql-kafka-avroflumeevent,代码行数:30,代码来源:AvroFlumeEventEncoder.scala
示例3: AvroFlumeEventDecoder
//设置package包名称以及导入依赖的类
package contrib.kafka.serializer
import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties
import org.apache.avro.io.BinaryDecoder
import org.apache.avro.io.DecoderFactory
import org.apache.avro.specific.SpecificDatumReader
import org.apache.flume.Event
import org.apache.flume.event.EventBuilder
import org.apache.flume.source.avro.AvroFlumeEvent
import java.io.ByteArrayInputStream
class AvroFlumeEventDecoder(props: VerifiableProperties = null)
extends Decoder[Event] {
private val reader: SpecificDatumReader[AvroFlumeEvent] =
new SpecificDatumReader[AvroFlumeEvent](classOf[AvroFlumeEvent])
private var decoder: BinaryDecoder = null.asInstanceOf[BinaryDecoder]
override def fromBytes(bytes: Array[Byte]): Event = {
val inputStream = new ByteArrayInputStream(bytes)
decoder = DecoderFactory.get.directBinaryDecoder(inputStream, decoder)
val avroEvent: AvroFlumeEvent = reader.read(null, decoder)
EventBuilder.withBody(
avroEvent.getBody.array,
toStringJavaMap(avroEvent.getHeaders))
}
def toStringJavaMap(
charSeqMap: JMap[CharSequence, CharSequence]): JMap[String, String] = {
import scala.collection.JavaConversions._
for ((k: CharSequence, v: CharSequence) <- charSeqMap)
yield (k.toString, v.toString)
}
}
开发者ID:saikocat,项目名称:spark-sql-kafka-avroflumeevent,代码行数:40,代码来源:AvroFlumeEventDecoder.scala
示例4: DefaultDecoder
//设置package包名称以及导入依赖的类
package me.jie.ksrdd
import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties
class DefaultDecoder(props: VerifiableProperties = null) extends Decoder[Array[Byte]] with Serializable{
def fromBytes(bytes: Array[Byte]): Array[Byte] = bytes
}
class StringDecoder(props: VerifiableProperties = null) extends Decoder[String] with Serializable {
val encoding =
if(props == null)
"UTF8"
else
props.getString("serializer.encoding", "UTF8")
def fromBytes(bytes: Array[Byte]): String = {
new String(bytes, encoding)
}
}
开发者ID:JensenFeng,项目名称:KSRdd,代码行数:23,代码来源:Decoder.scala
示例5: props
//设置package包名称以及导入依赖的类
package com.kakao.cuesheet.convert
import java.util.Arrays.copyOfRange
import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
sealed trait AvroDecoder[T] extends Decoder[T] {
def props: VerifiableProperties
protected val schema = new Schema.Parser().parse(props.getString(Avro.SCHEMA))
protected val skipBytes = props.getInt(Avro.SKIP_BYTES, 0)
protected val reader = new GenericDatumReader[GenericRecord](schema)
protected val decoder = Avro.recordDecoder(reader)
private def skip(bytes: Array[Byte], size: Int): Array[Byte] = {
val length = bytes.length
length - size match {
case remaining if remaining > 0 => copyOfRange(bytes, size, length)
case _ => new Array[Byte](0)
}
}
def parse(bytes: Array[Byte]): GenericRecord = {
val data = if (skipBytes == 0) bytes else skip(bytes, skipBytes)
decoder(data)
}
}
class AvroRecordDecoder(val props: VerifiableProperties) extends AvroDecoder[GenericRecord] {
override def fromBytes(bytes: Array[Byte]): GenericRecord = parse(bytes)
}
class AvroMapDecoder(val props: VerifiableProperties) extends AvroDecoder[Map[String, Any]] {
override def fromBytes(bytes: Array[Byte]): Map[String, Any] = Avro.toMap(parse(bytes))
}
class AvroJsonDecoder(val props: VerifiableProperties) extends AvroDecoder[String] {
override def fromBytes(bytes: Array[Byte]): String = Avro.toJson(parse(bytes))
}
开发者ID:kakao,项目名称:cuesheet,代码行数:46,代码来源:AvroDecoder.scala
示例6: SimplePartitioner
//设置package包名称以及导入依赖的类
package packt.ch05
import java.util
import kafka.utils.VerifiableProperties
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Partitioner
import org.apache.kafka.common.Cluster
object SimplePartitioner {
private var producer: KafkaProducer[String, String] = _
}
class SimplePartitioner(props: VerifiableProperties) extends Partitioner {
def partition(key: AnyRef, a_numPartitions: Int): Int = {
var partition = 0
val partitionKey = key.asInstanceOf[String]
val offset = partitionKey.lastIndexOf('.')
if (offset > 0) {
partition = java.lang.Integer.parseInt(partitionKey.substring(offset + 1)) %
a_numPartitions
}
partition
}
override def partition(topic: String,
key: AnyRef,
keyBytes: Array[Byte],
value: AnyRef,
valueBytes: Array[Byte],
cluster: Cluster): Int = partition(key, 10)
override def close() {
}
override def configure(configs: util.Map[String, _]) {
}
}
开发者ID:PacktPublishing,项目名称:Fast-Data-Processing-Systems-with-SMACK-Stack,代码行数:41,代码来源:SimplePartitioner.scala
示例7: toString
//设置package包名称以及导入依赖的类
package org.kongo.kafka.metrics.config
import java.util.regex.Pattern
import kafka.utils.VerifiableProperties
import org.kongo.kafka.metrics.Dimension
import org.kongo.kafka.metrics.RegexMetricPredicate
import scala.util.Try
val pollingIntervalSecs: Int = behavior.getInt("kafka.metrics.polling.interval.secs", 10)
override def toString: String = {
val dims = dimensions.map(_.name).mkString(",")
s"[host=$host, port=$port, prefix=$prefix, enabled=$enabled, dimensions=($dims), polling-interval=$pollingIntervalSecs]"
}
private def pattern(key: String): Option[Pattern] = {
val propsKey = s"${ ConfigBase }.${ key }"
if (behavior.contains(propsKey))
Try(Pattern.compile(behavior.getString(propsKey, null))).toOption
else
None
}
}
object KafkaStatsdReporterConfig {
val ConfigBase = "external.kafka.statsd.metrics"
def apply(props: VerifiableProperties): KafkaStatsdReporterConfig =
new KafkaStatsdReporterConfig(new VerifiableConfigBehavior(props))
def apply(map: java.util.Map[String, _]): KafkaStatsdReporterConfig =
new KafkaStatsdReporterConfig(new PropertiesMapBehavior(map))
}
开发者ID:kongo2002,项目名称:kafka-statsd-reporter,代码行数:37,代码来源:KafkaStatsdReporterConfig.scala
示例8: TestUtils
//设置package包名称以及导入依赖的类
package org.kongo.kafka.metrics
import java.util.Collections
import java.util.Properties
import kafka.utils.VerifiableProperties
import org.apache.kafka.common.{MetricName => KMetricName}
import org.apache.kafka.common.Metric
import org.kongo.kafka.metrics.config.KafkaStatsdReporterConfig
import org.kongo.kafka.metrics.config.PropertiesMapBehavior
import org.kongo.kafka.metrics.config.VerifiableConfigBehavior
object TestUtils {
def dummyKafkaMetric: Metric = {
new Metric {
override def metricName(): KMetricName = new KMetricName("name", "group", "description", Collections.emptyMap())
override def value(): Double = 0d
}
}
def singletonVerifiablePropertiesBehavior(key: String, value: AnyRef): VerifiableConfigBehavior =
new VerifiableConfigBehavior(singletonVerifiableProperties(key, value))
def singletonVerifiableProperties(key: String, value: AnyRef): VerifiableProperties = {
val props = new Properties
props.put(key, value)
new VerifiableProperties(props)
}
def emptyVerfiableConfig: KafkaStatsdReporterConfig =
KafkaStatsdReporterConfig(new VerifiableProperties())
def emptyVerifiableConfigBehavior: VerifiableConfigBehavior = {
val props = new VerifiableProperties()
new VerifiableConfigBehavior(props)
}
def emptyMapConfig: KafkaStatsdReporterConfig =
KafkaStatsdReporterConfig(Collections.emptyMap[String, String]())
def emptyMapConfigBehavior: PropertiesMapBehavior = {
new PropertiesMapBehavior(Collections.emptyMap())
}
}
开发者ID:kongo2002,项目名称:kafka-statsd-reporter,代码行数:46,代码来源:TestUtils.scala
注:本文中的kafka.utils.VerifiableProperties类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论