• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala ErrorMapping类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中kafka.common.ErrorMapping的典型用法代码示例。如果您正苦于以下问题:Scala ErrorMapping类的具体用法?Scala ErrorMapping怎么用?Scala ErrorMapping使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ErrorMapping类的3个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: HelloKafka

//设置package包名称以及导入依赖的类
package jp.gr.java_conf.massakai.application

import jp.gr.java_conf.massakai.kafka.{Partition, Config, KafkaConsumer}
import kafka.javaapi.PartitionMetadata
import scala.io.Source
import org.json4s.native.JsonMethods._
import kafka.common.ErrorMapping

object HelloKafka extends App {
  // FIXME: ????????????????????
  val configPath = "src/main/resources/config.json"
  val configSource = Source.fromFile(configPath)
  val configJson = parse(configSource mkString)

  implicit val formats = org.json4s.DefaultFormats
  val config: Config = configJson.extract[Config]

  // TODO: ????????????
  val topic = config.topic.head
  val topicName = topic.name
  // TODO: ???????????????
  val partition: Partition = topic.partition.head
  val partitionId = partition.id

  // TODO: ??????????????????
  val partitionMetadata: PartitionMetadata = KafkaConsumer.findLeader(config.bootstrap, topicName, partitionId).get
  val leaderBroker = partitionMetadata.leader
  val clientName = "HelloKafka_" + leaderBroker.host + "_" + leaderBroker.port
  val consumer = new KafkaConsumer(
    leaderBroker.host,
    leaderBroker.port,
    config.consumer.soTimeout,
    config.consumer.bufferSize,
    clientName)

  var readOffset = consumer.getLastOffset(topicName, partitionId, System.currentTimeMillis()).get
  val response = consumer.getMessages(topicName, partitionId, readOffset, config.consumer.fetchSize)
  if (response.hasError) {
    // TODO: ??????????
    response.errorCode(topic.name, partitionId) match {
      case ErrorMapping.OffsetOutOfRangeCode => println("Error: Offset out of range")
      case _ => println("Error")
    }
  } else {
    val messageAndOffsetIterator = response.messageSet(topicName, partitionId).iterator()
    while (messageAndOffsetIterator.hasNext) {
      val messageAndOffset = messageAndOffsetIterator.next()
      val currentOffset = messageAndOffset.offset
      if (currentOffset < readOffset) {
        println("Found an old offset: " + currentOffset + " Expecting: " + readOffset)
      } else {
        val payload = messageAndOffset.message.payload
        val bytes = new Array[Byte](payload.limit)
        payload.get(bytes)
        val message = new String(bytes)
        println(currentOffset + ": " + message)
        readOffset = messageAndOffset.nextOffset
      }
    }
  }
} 
开发者ID:massakai,项目名称:finagle-kafka-sample,代码行数:62,代码来源:HelloKafka.scala


示例2: kafkaHelper

//设置package包名称以及导入依赖的类
package me.jie.ksrdd

import kafka.api.TopicMetadataRequest
import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.consumer.SimpleConsumer


class kafkaHelper(config: kafkaConfig) {
    private val brokers = config.metadataBrokerList.split(",").map(kafkaBroker(_))
    private val socketTimeoutMs = config.socketTimeoutMs
    private val socketReceiveBufferBytes = config.socketReceiveBufferBytes
    private val consumerId = config.consumerId
    private val retries = config.retries
    private val refreshLeaderBackoffMs = config.refreshLeaderBackoffMs

    def findLeader(topicAndPartition: TopicAndPartition): kafkaBroker =
      Stream(1 to retries: _*).map { _ =>
          brokers.toStream.map { broker =>
            val consumer = new SimpleConsumer(broker.host, broker.port, socketTimeoutMs, socketReceiveBufferBytes, consumerId)
            try {
              consumer.send(new TopicMetadataRequest(Seq(topicAndPartition.topic), 0)).topicsMetadata.toStream.flatMap {
                case topicMeta if (topicMeta.errorCode == ErrorMapping.NoError && topicMeta.topic == topicAndPartition.topic) =>
                  topicMeta.partitionsMetadata
              }.map {
                case partitionMetadata if (partitionMetadata.errorCode == ErrorMapping.NoError &&
                  partitionMetadata.partitionId == topicAndPartition.partition) =>
                  partitionMetadata.leader
              } collectFirst {
                case Some(broker) => kafkaBroker(broker.host, broker.port)
              }
            } catch {
              case _: Throwable => None
            } finally {
              consumer.close()
            }
          } collectFirst {
            case Some(broker) => broker
          }
      } filter{
        case Some(_) => true
        case None    => Thread.sleep(refreshLeaderBackoffMs); false
      } collectFirst { case Some(broker) => broker} match {
        case Some(broker) => broker
        case None         => throw new Exception("Find leader failed!")
      }

    def buildConsumer(broker: kafkaBroker): SimpleConsumer = {
      val kafkaBroker(leaderHost, leaderPort) = broker
      new SimpleConsumer(leaderHost, leaderPort, socketTimeoutMs, socketReceiveBufferBytes, consumerId)
    }
} 
开发者ID:JensenFeng,项目名称:KSRdd,代码行数:52,代码来源:kafkaHelper.scala


示例3: kafkaOffsetSeeker

//设置package包名称以及导入依赖的类
package me.jie.ksrdd

import java.util.Properties

import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, PartitionOffsetsResponse}
import kafka.common.{ErrorMapping, TopicAndPartition}

import scala.annotation.tailrec


class kafkaOffsetSeeker(kafkaProps: Properties) {
  private val config = kafkaConfig(kafkaProps)

  private val kafkaHelper = new kafkaHelper(config)
  import kafkaHelper.{findLeader, buildConsumer}

  private val earliest = -2
  private val latest = -1

  def possibleOffsetBefore(topicAndPartition: TopicAndPartition, timeMillis: Long): Option[Long] = {
    val requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(timeMillis, 1))
    val request = OffsetRequest(requestInfo = requestInfo)

    @tailrec
    def fetchWithRetry(retries: Int): Option[Long] = {
      val leader = buildConsumer(findLeader(topicAndPartition))
      val response = leader.getOffsetsBefore(request)
      val PartitionOffsetsResponse(error, offsets) = response.partitionErrorAndOffsets(topicAndPartition)
      leader.close()

      (error, retries) match {
        case (ErrorMapping.NoError, _) => offsets.headOption
        case (_, config.retries) => throw ErrorMapping.exceptionFor(error)
        case (_, _) => Thread.sleep(config.refreshLeaderBackoffMs); fetchWithRetry(retries + 1)
      }
    }

    fetchWithRetry(0)
  }

  def earliestOffset(topicAndPartition: TopicAndPartition): Option[Long] =
    possibleOffsetBefore(topicAndPartition, earliest)

  def latestOffset(topicAndPartition: TopicAndPartition): Option[Long] =
    possibleOffsetBefore(topicAndPartition, latest)
} 
开发者ID:JensenFeng,项目名称:KSRdd,代码行数:47,代码来源:kafkaOffsetSeeker.scala



注:本文中的kafka.common.ErrorMapping类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala File类代码示例发布时间:2022-05-23
下一篇:
Scala ConsoleReader类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap