• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala TopicAndPartition类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中kafka.common.TopicAndPartition的典型用法代码示例。如果您正苦于以下问题:Scala TopicAndPartition类的具体用法?Scala TopicAndPartition怎么用?Scala TopicAndPartition使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了TopicAndPartition类的7个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: OffsetManagement

//设置package包名称以及导入依赖的类
package org.hpi.esb.util

import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.client.ClientUtils
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
import kafka.api.TopicMetadata
import org.hpi.esb.commons.util.Logging

object OffsetManagement extends Logging {

  def getNumberOfMessages(topic: String, partition: Int): Long = {

    val clientId = "GetOffset"

    val topicsMetadata = getMetaData(topic, clientId)
    getLatestOffset(topicsMetadata, topic, partition, clientId)
  }

  private def getMetaData(topic: String, clientId: String) = {
    val brokerList = "192.168.30.208:9092, 192.168.30.207:9092, 192.168.30.141:9092"
    val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)


    val maxWaitMs = 1000
    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
    if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) {
      logger.error(s"Error: no valid topic metadata for topic: $topic, probably the topic does not exist, run kafka-list-topic.sh to verify")
      sys.exit(1)
    }

    topicsMetadata
  }

  private def getLatestOffset(topicsMetadata: Seq[TopicMetadata], topic: String, partition: Int, clientId: String) = {

    val partitionMetadataOpt = topicsMetadata.head.partitionsMetadata.find(_.partitionId == partition)
    val time = -1
    val nOffsets = 1

    partitionMetadataOpt match {
      case Some(metadata) =>
        metadata.leader match {
          case Some(leader) =>
            val timeout = 10000
            val bufferSize = 100000
            val consumer = new SimpleConsumer(leader.host, leader.port, timeout, bufferSize, clientId)
            val topicAndPartition = TopicAndPartition(topic, partition)
            val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
            val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets

            offsets.last
          case None => logger.error(s"Error: partition $partition does not have a leader. Skip getting offsets"); sys.exit(1)
        }
      case None => logger.error(s"Error: partition $partition does not exist"); sys.exit(1)
    }
  }

} 
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:60,代码来源:OffsetManagement.scala


示例2: KafkaConsumer

//设置package包名称以及导入依赖的类
package jp.gr.java_conf.massakai.kafka

import java.util.Collections

import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder}
import kafka.common.TopicAndPartition
import kafka.javaapi._
import kafka.javaapi.consumer.SimpleConsumer
import collection.JavaConversions._

object KafkaConsumer {
  def findLeader(bootstraps: Seq[Broker], topic: String, partition: Int): Option[PartitionMetadata] = {
    for (bootstrap <- bootstraps) {
      val consumer = new SimpleConsumer(bootstrap.host, bootstrap.port, 100000, 64 * 1024, "leaderLookup")
      val topics = Collections.singletonList(topic)
      val req = new TopicMetadataRequest(topics)
      val resp = consumer.send(req)
      val metadata: java.util.List[TopicMetadata] = resp.topicsMetadata
      for (topicMetadata: TopicMetadata <- metadata) {
        for (partitionMetadata: PartitionMetadata <- topicMetadata.partitionsMetadata) {
          if (partitionMetadata.partitionId == partition) {
            return Some(partitionMetadata)
          }
        }
      }
    }
    None
  }
}

case class KafkaConsumer(leadBroker: String, port: Int, soTimeout: Int, bufferSize: Int, clientName: String) {
  val consumer = new SimpleConsumer(leadBroker, port, soTimeout, bufferSize, clientName)


  def getMessages(topic: String, partition: Int, offset: Long, fetchSize: Int): FetchResponse = {
    val request = new FetchRequestBuilder()
      .clientId(clientName)
      .addFetch(topic, partition, offset, fetchSize)
      .build()
    consumer.fetch(request)
  }

  def getLastOffset(topic: String, partition: Int, whichTime: Long): Option[Long] = {
    val topicAndPartition = new TopicAndPartition(topic, partition)
    val requestInfo = new java.util.HashMap[TopicAndPartition, PartitionOffsetRequestInfo]()
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1))
    val request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion, clientName)
    val response = consumer.getOffsetsBefore(request)
    if (response.hasError) {
      println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition))
      None
    } else {
      Some(response.offsets(topic, partition)(0))
    }
  }

  // TODO: ?????
  // consumer.close()
} 
开发者ID:massakai,项目名称:finagle-kafka-sample,代码行数:60,代码来源:KafkaConsumer.scala


示例3: KafkaOffsetGetterSpec

//设置package包名称以及导入依赖的类
package com.quantifind.kafka.core

import com.quantifind.kafka.core.KafkaOffsetGetter.GroupTopicPartition
import com.quantifind.utils.ZkUtilsWrapper
import kafka.api.{OffsetRequest, OffsetResponse, PartitionOffsetsResponse}
import kafka.common.{OffsetAndMetadata, TopicAndPartition}
import kafka.consumer.SimpleConsumer
import org.I0Itec.zkclient.ZkClient
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.mockito.{Matchers => MockitoMatchers, Mockito}
import org.scalatest._

class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers {

  trait Fixture {

    val mockedZkClient = Mockito.mock(classOf[ZkClient])
    val mockedZkUtil =  Mockito.mock(classOf[ZkUtilsWrapper])
    val mockedConsumer = Mockito.mock(classOf[SimpleConsumer])
    val testPartitionLeader = 1

    val offsetGetter = new KafkaOffsetGetter(mockedZkClient, mockedZkUtil)
    offsetGetter.consumerMap += (testPartitionLeader -> Some(mockedConsumer))
  }

  "KafkaOffsetGetter" should "be able to build offset data for given partition" in new Fixture {

    val testGroup = "testgroup"
    val testTopic = "testtopic"
    val testPartition = 1

    val topicAndPartition = TopicAndPartition(testTopic, testPartition)
    val groupTopicPartition = GroupTopicPartition(testGroup, topicAndPartition)
    val offsetAndMetadata = OffsetAndMetadata(100, "meta", System.currentTimeMillis)
    KafkaOffsetGetter.offsetMap += (groupTopicPartition -> offsetAndMetadata)

    when(mockedZkUtil.getLeaderForPartition(MockitoMatchers.eq(mockedZkClient), MockitoMatchers.eq(testTopic), MockitoMatchers.eq(testPartition)))
      .thenReturn(Some(testPartitionLeader))

    val partitionErrorAndOffsets = Map(topicAndPartition -> PartitionOffsetsResponse(0,Seq(102)))
    val offsetResponse = OffsetResponse(1, partitionErrorAndOffsets)
    when(mockedConsumer.getOffsetsBefore(any[OffsetRequest])).thenReturn(offsetResponse)

    offsetGetter.processPartition(testGroup, testTopic, testPartition) match {
      case Some(offsetInfo) =>
        offsetInfo.topic shouldBe testTopic
        offsetInfo.group shouldBe testGroup
        offsetInfo.partition shouldBe testPartition
        offsetInfo.offset shouldBe 100
        offsetInfo.logSize shouldBe 102
      case None => fail("Failed to build offset data")
    }
    
  }
} 
开发者ID:ZhouGitHub1,项目名称:KafkaOffsetMonitor,代码行数:57,代码来源:KafkaOffsetGetterSpec.scala


示例4: kafkaHelper

//设置package包名称以及导入依赖的类
package me.jie.ksrdd

import kafka.api.TopicMetadataRequest
import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.consumer.SimpleConsumer


class kafkaHelper(config: kafkaConfig) {
    private val brokers = config.metadataBrokerList.split(",").map(kafkaBroker(_))
    private val socketTimeoutMs = config.socketTimeoutMs
    private val socketReceiveBufferBytes = config.socketReceiveBufferBytes
    private val consumerId = config.consumerId
    private val retries = config.retries
    private val refreshLeaderBackoffMs = config.refreshLeaderBackoffMs

    def findLeader(topicAndPartition: TopicAndPartition): kafkaBroker =
      Stream(1 to retries: _*).map { _ =>
          brokers.toStream.map { broker =>
            val consumer = new SimpleConsumer(broker.host, broker.port, socketTimeoutMs, socketReceiveBufferBytes, consumerId)
            try {
              consumer.send(new TopicMetadataRequest(Seq(topicAndPartition.topic), 0)).topicsMetadata.toStream.flatMap {
                case topicMeta if (topicMeta.errorCode == ErrorMapping.NoError && topicMeta.topic == topicAndPartition.topic) =>
                  topicMeta.partitionsMetadata
              }.map {
                case partitionMetadata if (partitionMetadata.errorCode == ErrorMapping.NoError &&
                  partitionMetadata.partitionId == topicAndPartition.partition) =>
                  partitionMetadata.leader
              } collectFirst {
                case Some(broker) => kafkaBroker(broker.host, broker.port)
              }
            } catch {
              case _: Throwable => None
            } finally {
              consumer.close()
            }
          } collectFirst {
            case Some(broker) => broker
          }
      } filter{
        case Some(_) => true
        case None    => Thread.sleep(refreshLeaderBackoffMs); false
      } collectFirst { case Some(broker) => broker} match {
        case Some(broker) => broker
        case None         => throw new Exception("Find leader failed!")
      }

    def buildConsumer(broker: kafkaBroker): SimpleConsumer = {
      val kafkaBroker(leaderHost, leaderPort) = broker
      new SimpleConsumer(leaderHost, leaderPort, socketTimeoutMs, socketReceiveBufferBytes, consumerId)
    }
} 
开发者ID:JensenFeng,项目名称:KSRdd,代码行数:52,代码来源:kafkaHelper.scala


示例5: kafkaOffsetSeeker

//设置package包名称以及导入依赖的类
package me.jie.ksrdd

import java.util.Properties

import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, PartitionOffsetsResponse}
import kafka.common.{ErrorMapping, TopicAndPartition}

import scala.annotation.tailrec


class kafkaOffsetSeeker(kafkaProps: Properties) {
  private val config = kafkaConfig(kafkaProps)

  private val kafkaHelper = new kafkaHelper(config)
  import kafkaHelper.{findLeader, buildConsumer}

  private val earliest = -2
  private val latest = -1

  def possibleOffsetBefore(topicAndPartition: TopicAndPartition, timeMillis: Long): Option[Long] = {
    val requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(timeMillis, 1))
    val request = OffsetRequest(requestInfo = requestInfo)

    @tailrec
    def fetchWithRetry(retries: Int): Option[Long] = {
      val leader = buildConsumer(findLeader(topicAndPartition))
      val response = leader.getOffsetsBefore(request)
      val PartitionOffsetsResponse(error, offsets) = response.partitionErrorAndOffsets(topicAndPartition)
      leader.close()

      (error, retries) match {
        case (ErrorMapping.NoError, _) => offsets.headOption
        case (_, config.retries) => throw ErrorMapping.exceptionFor(error)
        case (_, _) => Thread.sleep(config.refreshLeaderBackoffMs); fetchWithRetry(retries + 1)
      }
    }

    fetchWithRetry(0)
  }

  def earliestOffset(topicAndPartition: TopicAndPartition): Option[Long] =
    possibleOffsetBefore(topicAndPartition, earliest)

  def latestOffset(topicAndPartition: TopicAndPartition): Option[Long] =
    possibleOffsetBefore(topicAndPartition, latest)
} 
开发者ID:JensenFeng,项目名称:KSRdd,代码行数:47,代码来源:kafkaOffsetSeeker.scala


示例6: FetchDataProcessorCache

//设置package包名称以及导入依赖的类
package com.box.castle.router.kafkadispatcher.cache

import com.box.castle.batch.CastleMessageBatch
import kafka.common.TopicAndPartition


private[kafkadispatcher] class FetchDataProcessorCache private(
    cacheMap: Map[TopicAndPartition, Cache], maxSizeInBytes: Long) {

  require(maxSizeInBytes > 0, "Cache must have more than 0 bytes to use")

  def add(topicAndPartition: TopicAndPartition, batch: CastleMessageBatch): FetchDataProcessorCache = {
    cacheMap.get(topicAndPartition) match {
      case Some(cache) => new FetchDataProcessorCache(cacheMap + (topicAndPartition -> cache.add(batch)), maxSizeInBytes)
      case None => {
        val newCache = Cache(batch, maxSizeInBytes / (cacheMap.size + 1))
        // If we are adding cache for a new topic and partition, then we have to
        // resize the existing caches to accommodate the new cache
        FetchDataProcessorCache.resize(cacheMap + (topicAndPartition -> newCache), maxSizeInBytes)
      }
    }
  }

  def get(topicAndPartition: TopicAndPartition, offset: Long): Option[CastleMessageBatch] =
    cacheMap.get(topicAndPartition).flatMap(cache => cache.get(offset))

  def setMaxSizeInBytes(newMaxSizeInBytes: Long): FetchDataProcessorCache = {
    if (newMaxSizeInBytes == maxSizeInBytes)
      this
    else
      FetchDataProcessorCache.resize(cacheMap, newMaxSizeInBytes)
  }

  override def toString: String = {
    cacheMap.toString()
  }
}

private[kafkadispatcher]
object FetchDataProcessorCache {

  def apply(maxSizeInBytes: Long) = new FetchDataProcessorCache(Map.empty[TopicAndPartition, Cache], maxSizeInBytes)

  private def resize(cacheMap: Map[TopicAndPartition, Cache], maxSizeInBytes: Long): FetchDataProcessorCache = {
    if (cacheMap.nonEmpty) {
      val sizePerTopicAndPartition = maxSizeInBytes / cacheMap.size
      new FetchDataProcessorCache(cacheMap.map {
        case (topicAndPartition, cache) => (topicAndPartition, cache.setMaxSizeInBytes(sizePerTopicAndPartition))
      }, maxSizeInBytes)
    }
    else {
      // Empty MessageFetcherCache
      new FetchDataProcessorCache(Map.empty[TopicAndPartition, Cache], maxSizeInBytes)
    }
  }
} 
开发者ID:Box-Castle,项目名称:router,代码行数:57,代码来源:FetchDataProcessorCache.scala


示例7: RunningEmbeddedKafka

//设置package包名称以及导入依赖的类
package com.pygmalios.sparkCheckpointExperience.kafka.embedded

import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.Logging

import scala.concurrent.duration._

class RunningEmbeddedKafka(producer: KafkaProducer[String, String],
                           consumer: SimpleConsumer,
                           topic: String) extends Logging {
  val topicAndPartition = TopicAndPartition(topic, 0)

  def publish(topic: String, message: String): Unit = publish(topic, null, message)

  def publish(topic: String, key: String, message: String): Unit = {
    producer.send(new ProducerRecord(topic, key, message)).get(3, SECONDS)

    val latest    = getOffset(OffsetRequest.LatestTime)
    val earliest  = getOffset(OffsetRequest.EarliestTime)
    log.info(f"$topic [$earliest%3d:$latest%3d]: $key%3s -> $message%3s")
  }

  private def getOffset(time: Long): Long = {
    val response = consumer.getOffsetsBefore(OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, 100))))
    response.partitionErrorAndOffsets(topicAndPartition).offsets.head
  }
} 
开发者ID:pygmalios,项目名称:spark-checkpoint-experience,代码行数:31,代码来源:RunningEmbeddedKafka.scala



注:本文中的kafka.common.TopicAndPartition类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala KryoRegistrator类代码示例发布时间:2022-05-23
下一篇:
Scala SVMWithSGD类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap