本文整理汇总了Scala中kafka.common.ErrorMapping类的典型用法代码示例。如果您正苦于以下问题:Scala ErrorMapping类的具体用法?Scala ErrorMapping怎么用?Scala ErrorMapping使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ErrorMapping类的3个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: HelloKafka
//设置package包名称以及导入依赖的类
package jp.gr.java_conf.massakai.application
import jp.gr.java_conf.massakai.kafka.{Partition, Config, KafkaConsumer}
import kafka.javaapi.PartitionMetadata
import scala.io.Source
import org.json4s.native.JsonMethods._
import kafka.common.ErrorMapping
object HelloKafka extends App {
// FIXME: ????????????????????
val configPath = "src/main/resources/config.json"
val configSource = Source.fromFile(configPath)
val configJson = parse(configSource mkString)
implicit val formats = org.json4s.DefaultFormats
val config: Config = configJson.extract[Config]
// TODO: ????????????
val topic = config.topic.head
val topicName = topic.name
// TODO: ???????????????
val partition: Partition = topic.partition.head
val partitionId = partition.id
// TODO: ??????????????????
val partitionMetadata: PartitionMetadata = KafkaConsumer.findLeader(config.bootstrap, topicName, partitionId).get
val leaderBroker = partitionMetadata.leader
val clientName = "HelloKafka_" + leaderBroker.host + "_" + leaderBroker.port
val consumer = new KafkaConsumer(
leaderBroker.host,
leaderBroker.port,
config.consumer.soTimeout,
config.consumer.bufferSize,
clientName)
var readOffset = consumer.getLastOffset(topicName, partitionId, System.currentTimeMillis()).get
val response = consumer.getMessages(topicName, partitionId, readOffset, config.consumer.fetchSize)
if (response.hasError) {
// TODO: ??????????
response.errorCode(topic.name, partitionId) match {
case ErrorMapping.OffsetOutOfRangeCode => println("Error: Offset out of range")
case _ => println("Error")
}
} else {
val messageAndOffsetIterator = response.messageSet(topicName, partitionId).iterator()
while (messageAndOffsetIterator.hasNext) {
val messageAndOffset = messageAndOffsetIterator.next()
val currentOffset = messageAndOffset.offset
if (currentOffset < readOffset) {
println("Found an old offset: " + currentOffset + " Expecting: " + readOffset)
} else {
val payload = messageAndOffset.message.payload
val bytes = new Array[Byte](payload.limit)
payload.get(bytes)
val message = new String(bytes)
println(currentOffset + ": " + message)
readOffset = messageAndOffset.nextOffset
}
}
}
}
开发者ID:massakai,项目名称:finagle-kafka-sample,代码行数:62,代码来源:HelloKafka.scala
示例2: kafkaHelper
//设置package包名称以及导入依赖的类
package me.jie.ksrdd
import kafka.api.TopicMetadataRequest
import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.consumer.SimpleConsumer
class kafkaHelper(config: kafkaConfig) {
private val brokers = config.metadataBrokerList.split(",").map(kafkaBroker(_))
private val socketTimeoutMs = config.socketTimeoutMs
private val socketReceiveBufferBytes = config.socketReceiveBufferBytes
private val consumerId = config.consumerId
private val retries = config.retries
private val refreshLeaderBackoffMs = config.refreshLeaderBackoffMs
def findLeader(topicAndPartition: TopicAndPartition): kafkaBroker =
Stream(1 to retries: _*).map { _ =>
brokers.toStream.map { broker =>
val consumer = new SimpleConsumer(broker.host, broker.port, socketTimeoutMs, socketReceiveBufferBytes, consumerId)
try {
consumer.send(new TopicMetadataRequest(Seq(topicAndPartition.topic), 0)).topicsMetadata.toStream.flatMap {
case topicMeta if (topicMeta.errorCode == ErrorMapping.NoError && topicMeta.topic == topicAndPartition.topic) =>
topicMeta.partitionsMetadata
}.map {
case partitionMetadata if (partitionMetadata.errorCode == ErrorMapping.NoError &&
partitionMetadata.partitionId == topicAndPartition.partition) =>
partitionMetadata.leader
} collectFirst {
case Some(broker) => kafkaBroker(broker.host, broker.port)
}
} catch {
case _: Throwable => None
} finally {
consumer.close()
}
} collectFirst {
case Some(broker) => broker
}
} filter{
case Some(_) => true
case None => Thread.sleep(refreshLeaderBackoffMs); false
} collectFirst { case Some(broker) => broker} match {
case Some(broker) => broker
case None => throw new Exception("Find leader failed!")
}
def buildConsumer(broker: kafkaBroker): SimpleConsumer = {
val kafkaBroker(leaderHost, leaderPort) = broker
new SimpleConsumer(leaderHost, leaderPort, socketTimeoutMs, socketReceiveBufferBytes, consumerId)
}
}
开发者ID:JensenFeng,项目名称:KSRdd,代码行数:52,代码来源:kafkaHelper.scala
示例3: kafkaOffsetSeeker
//设置package包名称以及导入依赖的类
package me.jie.ksrdd
import java.util.Properties
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, PartitionOffsetsResponse}
import kafka.common.{ErrorMapping, TopicAndPartition}
import scala.annotation.tailrec
class kafkaOffsetSeeker(kafkaProps: Properties) {
private val config = kafkaConfig(kafkaProps)
private val kafkaHelper = new kafkaHelper(config)
import kafkaHelper.{findLeader, buildConsumer}
private val earliest = -2
private val latest = -1
def possibleOffsetBefore(topicAndPartition: TopicAndPartition, timeMillis: Long): Option[Long] = {
val requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(timeMillis, 1))
val request = OffsetRequest(requestInfo = requestInfo)
@tailrec
def fetchWithRetry(retries: Int): Option[Long] = {
val leader = buildConsumer(findLeader(topicAndPartition))
val response = leader.getOffsetsBefore(request)
val PartitionOffsetsResponse(error, offsets) = response.partitionErrorAndOffsets(topicAndPartition)
leader.close()
(error, retries) match {
case (ErrorMapping.NoError, _) => offsets.headOption
case (_, config.retries) => throw ErrorMapping.exceptionFor(error)
case (_, _) => Thread.sleep(config.refreshLeaderBackoffMs); fetchWithRetry(retries + 1)
}
}
fetchWithRetry(0)
}
def earliestOffset(topicAndPartition: TopicAndPartition): Option[Long] =
possibleOffsetBefore(topicAndPartition, earliest)
def latestOffset(topicAndPartition: TopicAndPartition): Option[Long] =
possibleOffsetBefore(topicAndPartition, latest)
}
开发者ID:JensenFeng,项目名称:KSRdd,代码行数:47,代码来源:kafkaOffsetSeeker.scala
注:本文中的kafka.common.ErrorMapping类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论