本文整理汇总了Scala中kafka.common.TopicAndPartition类的典型用法代码示例。如果您正苦于以下问题:Scala TopicAndPartition类的具体用法?Scala TopicAndPartition怎么用?Scala TopicAndPartition使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了TopicAndPartition类的7个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: OffsetManagement
//设置package包名称以及导入依赖的类
package org.hpi.esb.util
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.client.ClientUtils
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
import kafka.api.TopicMetadata
import org.hpi.esb.commons.util.Logging
object OffsetManagement extends Logging {
def getNumberOfMessages(topic: String, partition: Int): Long = {
val clientId = "GetOffset"
val topicsMetadata = getMetaData(topic, clientId)
getLatestOffset(topicsMetadata, topic, partition, clientId)
}
private def getMetaData(topic: String, clientId: String) = {
val brokerList = "192.168.30.208:9092, 192.168.30.207:9092, 192.168.30.141:9092"
val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
val maxWaitMs = 1000
val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) {
logger.error(s"Error: no valid topic metadata for topic: $topic, probably the topic does not exist, run kafka-list-topic.sh to verify")
sys.exit(1)
}
topicsMetadata
}
private def getLatestOffset(topicsMetadata: Seq[TopicMetadata], topic: String, partition: Int, clientId: String) = {
val partitionMetadataOpt = topicsMetadata.head.partitionsMetadata.find(_.partitionId == partition)
val time = -1
val nOffsets = 1
partitionMetadataOpt match {
case Some(metadata) =>
metadata.leader match {
case Some(leader) =>
val timeout = 10000
val bufferSize = 100000
val consumer = new SimpleConsumer(leader.host, leader.port, timeout, bufferSize, clientId)
val topicAndPartition = TopicAndPartition(topic, partition)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
offsets.last
case None => logger.error(s"Error: partition $partition does not have a leader. Skip getting offsets"); sys.exit(1)
}
case None => logger.error(s"Error: partition $partition does not exist"); sys.exit(1)
}
}
}
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:60,代码来源:OffsetManagement.scala
示例2: KafkaConsumer
//设置package包名称以及导入依赖的类
package jp.gr.java_conf.massakai.kafka
import java.util.Collections
import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder}
import kafka.common.TopicAndPartition
import kafka.javaapi._
import kafka.javaapi.consumer.SimpleConsumer
import collection.JavaConversions._
object KafkaConsumer {
def findLeader(bootstraps: Seq[Broker], topic: String, partition: Int): Option[PartitionMetadata] = {
for (bootstrap <- bootstraps) {
val consumer = new SimpleConsumer(bootstrap.host, bootstrap.port, 100000, 64 * 1024, "leaderLookup")
val topics = Collections.singletonList(topic)
val req = new TopicMetadataRequest(topics)
val resp = consumer.send(req)
val metadata: java.util.List[TopicMetadata] = resp.topicsMetadata
for (topicMetadata: TopicMetadata <- metadata) {
for (partitionMetadata: PartitionMetadata <- topicMetadata.partitionsMetadata) {
if (partitionMetadata.partitionId == partition) {
return Some(partitionMetadata)
}
}
}
}
None
}
}
case class KafkaConsumer(leadBroker: String, port: Int, soTimeout: Int, bufferSize: Int, clientName: String) {
val consumer = new SimpleConsumer(leadBroker, port, soTimeout, bufferSize, clientName)
def getMessages(topic: String, partition: Int, offset: Long, fetchSize: Int): FetchResponse = {
val request = new FetchRequestBuilder()
.clientId(clientName)
.addFetch(topic, partition, offset, fetchSize)
.build()
consumer.fetch(request)
}
def getLastOffset(topic: String, partition: Int, whichTime: Long): Option[Long] = {
val topicAndPartition = new TopicAndPartition(topic, partition)
val requestInfo = new java.util.HashMap[TopicAndPartition, PartitionOffsetRequestInfo]()
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1))
val request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion, clientName)
val response = consumer.getOffsetsBefore(request)
if (response.hasError) {
println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition))
None
} else {
Some(response.offsets(topic, partition)(0))
}
}
// TODO: ?????
// consumer.close()
}
开发者ID:massakai,项目名称:finagle-kafka-sample,代码行数:60,代码来源:KafkaConsumer.scala
示例3: KafkaOffsetGetterSpec
//设置package包名称以及导入依赖的类
package com.quantifind.kafka.core
import com.quantifind.kafka.core.KafkaOffsetGetter.GroupTopicPartition
import com.quantifind.utils.ZkUtilsWrapper
import kafka.api.{OffsetRequest, OffsetResponse, PartitionOffsetsResponse}
import kafka.common.{OffsetAndMetadata, TopicAndPartition}
import kafka.consumer.SimpleConsumer
import org.I0Itec.zkclient.ZkClient
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.mockito.{Matchers => MockitoMatchers, Mockito}
import org.scalatest._
class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers {
trait Fixture {
val mockedZkClient = Mockito.mock(classOf[ZkClient])
val mockedZkUtil = Mockito.mock(classOf[ZkUtilsWrapper])
val mockedConsumer = Mockito.mock(classOf[SimpleConsumer])
val testPartitionLeader = 1
val offsetGetter = new KafkaOffsetGetter(mockedZkClient, mockedZkUtil)
offsetGetter.consumerMap += (testPartitionLeader -> Some(mockedConsumer))
}
"KafkaOffsetGetter" should "be able to build offset data for given partition" in new Fixture {
val testGroup = "testgroup"
val testTopic = "testtopic"
val testPartition = 1
val topicAndPartition = TopicAndPartition(testTopic, testPartition)
val groupTopicPartition = GroupTopicPartition(testGroup, topicAndPartition)
val offsetAndMetadata = OffsetAndMetadata(100, "meta", System.currentTimeMillis)
KafkaOffsetGetter.offsetMap += (groupTopicPartition -> offsetAndMetadata)
when(mockedZkUtil.getLeaderForPartition(MockitoMatchers.eq(mockedZkClient), MockitoMatchers.eq(testTopic), MockitoMatchers.eq(testPartition)))
.thenReturn(Some(testPartitionLeader))
val partitionErrorAndOffsets = Map(topicAndPartition -> PartitionOffsetsResponse(0,Seq(102)))
val offsetResponse = OffsetResponse(1, partitionErrorAndOffsets)
when(mockedConsumer.getOffsetsBefore(any[OffsetRequest])).thenReturn(offsetResponse)
offsetGetter.processPartition(testGroup, testTopic, testPartition) match {
case Some(offsetInfo) =>
offsetInfo.topic shouldBe testTopic
offsetInfo.group shouldBe testGroup
offsetInfo.partition shouldBe testPartition
offsetInfo.offset shouldBe 100
offsetInfo.logSize shouldBe 102
case None => fail("Failed to build offset data")
}
}
}
开发者ID:ZhouGitHub1,项目名称:KafkaOffsetMonitor,代码行数:57,代码来源:KafkaOffsetGetterSpec.scala
示例4: kafkaHelper
//设置package包名称以及导入依赖的类
package me.jie.ksrdd
import kafka.api.TopicMetadataRequest
import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.consumer.SimpleConsumer
class kafkaHelper(config: kafkaConfig) {
private val brokers = config.metadataBrokerList.split(",").map(kafkaBroker(_))
private val socketTimeoutMs = config.socketTimeoutMs
private val socketReceiveBufferBytes = config.socketReceiveBufferBytes
private val consumerId = config.consumerId
private val retries = config.retries
private val refreshLeaderBackoffMs = config.refreshLeaderBackoffMs
def findLeader(topicAndPartition: TopicAndPartition): kafkaBroker =
Stream(1 to retries: _*).map { _ =>
brokers.toStream.map { broker =>
val consumer = new SimpleConsumer(broker.host, broker.port, socketTimeoutMs, socketReceiveBufferBytes, consumerId)
try {
consumer.send(new TopicMetadataRequest(Seq(topicAndPartition.topic), 0)).topicsMetadata.toStream.flatMap {
case topicMeta if (topicMeta.errorCode == ErrorMapping.NoError && topicMeta.topic == topicAndPartition.topic) =>
topicMeta.partitionsMetadata
}.map {
case partitionMetadata if (partitionMetadata.errorCode == ErrorMapping.NoError &&
partitionMetadata.partitionId == topicAndPartition.partition) =>
partitionMetadata.leader
} collectFirst {
case Some(broker) => kafkaBroker(broker.host, broker.port)
}
} catch {
case _: Throwable => None
} finally {
consumer.close()
}
} collectFirst {
case Some(broker) => broker
}
} filter{
case Some(_) => true
case None => Thread.sleep(refreshLeaderBackoffMs); false
} collectFirst { case Some(broker) => broker} match {
case Some(broker) => broker
case None => throw new Exception("Find leader failed!")
}
def buildConsumer(broker: kafkaBroker): SimpleConsumer = {
val kafkaBroker(leaderHost, leaderPort) = broker
new SimpleConsumer(leaderHost, leaderPort, socketTimeoutMs, socketReceiveBufferBytes, consumerId)
}
}
开发者ID:JensenFeng,项目名称:KSRdd,代码行数:52,代码来源:kafkaHelper.scala
示例5: kafkaOffsetSeeker
//设置package包名称以及导入依赖的类
package me.jie.ksrdd
import java.util.Properties
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, PartitionOffsetsResponse}
import kafka.common.{ErrorMapping, TopicAndPartition}
import scala.annotation.tailrec
class kafkaOffsetSeeker(kafkaProps: Properties) {
private val config = kafkaConfig(kafkaProps)
private val kafkaHelper = new kafkaHelper(config)
import kafkaHelper.{findLeader, buildConsumer}
private val earliest = -2
private val latest = -1
def possibleOffsetBefore(topicAndPartition: TopicAndPartition, timeMillis: Long): Option[Long] = {
val requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(timeMillis, 1))
val request = OffsetRequest(requestInfo = requestInfo)
@tailrec
def fetchWithRetry(retries: Int): Option[Long] = {
val leader = buildConsumer(findLeader(topicAndPartition))
val response = leader.getOffsetsBefore(request)
val PartitionOffsetsResponse(error, offsets) = response.partitionErrorAndOffsets(topicAndPartition)
leader.close()
(error, retries) match {
case (ErrorMapping.NoError, _) => offsets.headOption
case (_, config.retries) => throw ErrorMapping.exceptionFor(error)
case (_, _) => Thread.sleep(config.refreshLeaderBackoffMs); fetchWithRetry(retries + 1)
}
}
fetchWithRetry(0)
}
def earliestOffset(topicAndPartition: TopicAndPartition): Option[Long] =
possibleOffsetBefore(topicAndPartition, earliest)
def latestOffset(topicAndPartition: TopicAndPartition): Option[Long] =
possibleOffsetBefore(topicAndPartition, latest)
}
开发者ID:JensenFeng,项目名称:KSRdd,代码行数:47,代码来源:kafkaOffsetSeeker.scala
示例6: FetchDataProcessorCache
//设置package包名称以及导入依赖的类
package com.box.castle.router.kafkadispatcher.cache
import com.box.castle.batch.CastleMessageBatch
import kafka.common.TopicAndPartition
private[kafkadispatcher] class FetchDataProcessorCache private(
cacheMap: Map[TopicAndPartition, Cache], maxSizeInBytes: Long) {
require(maxSizeInBytes > 0, "Cache must have more than 0 bytes to use")
def add(topicAndPartition: TopicAndPartition, batch: CastleMessageBatch): FetchDataProcessorCache = {
cacheMap.get(topicAndPartition) match {
case Some(cache) => new FetchDataProcessorCache(cacheMap + (topicAndPartition -> cache.add(batch)), maxSizeInBytes)
case None => {
val newCache = Cache(batch, maxSizeInBytes / (cacheMap.size + 1))
// If we are adding cache for a new topic and partition, then we have to
// resize the existing caches to accommodate the new cache
FetchDataProcessorCache.resize(cacheMap + (topicAndPartition -> newCache), maxSizeInBytes)
}
}
}
def get(topicAndPartition: TopicAndPartition, offset: Long): Option[CastleMessageBatch] =
cacheMap.get(topicAndPartition).flatMap(cache => cache.get(offset))
def setMaxSizeInBytes(newMaxSizeInBytes: Long): FetchDataProcessorCache = {
if (newMaxSizeInBytes == maxSizeInBytes)
this
else
FetchDataProcessorCache.resize(cacheMap, newMaxSizeInBytes)
}
override def toString: String = {
cacheMap.toString()
}
}
private[kafkadispatcher]
object FetchDataProcessorCache {
def apply(maxSizeInBytes: Long) = new FetchDataProcessorCache(Map.empty[TopicAndPartition, Cache], maxSizeInBytes)
private def resize(cacheMap: Map[TopicAndPartition, Cache], maxSizeInBytes: Long): FetchDataProcessorCache = {
if (cacheMap.nonEmpty) {
val sizePerTopicAndPartition = maxSizeInBytes / cacheMap.size
new FetchDataProcessorCache(cacheMap.map {
case (topicAndPartition, cache) => (topicAndPartition, cache.setMaxSizeInBytes(sizePerTopicAndPartition))
}, maxSizeInBytes)
}
else {
// Empty MessageFetcherCache
new FetchDataProcessorCache(Map.empty[TopicAndPartition, Cache], maxSizeInBytes)
}
}
}
开发者ID:Box-Castle,项目名称:router,代码行数:57,代码来源:FetchDataProcessorCache.scala
示例7: RunningEmbeddedKafka
//设置package包名称以及导入依赖的类
package com.pygmalios.sparkCheckpointExperience.kafka.embedded
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.Logging
import scala.concurrent.duration._
class RunningEmbeddedKafka(producer: KafkaProducer[String, String],
consumer: SimpleConsumer,
topic: String) extends Logging {
val topicAndPartition = TopicAndPartition(topic, 0)
def publish(topic: String, message: String): Unit = publish(topic, null, message)
def publish(topic: String, key: String, message: String): Unit = {
producer.send(new ProducerRecord(topic, key, message)).get(3, SECONDS)
val latest = getOffset(OffsetRequest.LatestTime)
val earliest = getOffset(OffsetRequest.EarliestTime)
log.info(f"$topic [$earliest%3d:$latest%3d]: $key%3s -> $message%3s")
}
private def getOffset(time: Long): Long = {
val response = consumer.getOffsetsBefore(OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, 100))))
response.partitionErrorAndOffsets(topicAndPartition).offsets.head
}
}
开发者ID:pygmalios,项目名称:spark-checkpoint-experience,代码行数:31,代码来源:RunningEmbeddedKafka.scala
注:本文中的kafka.common.TopicAndPartition类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论