本文整理汇总了Scala中org.apache.kafka.common.TopicPartition类的典型用法代码示例。如果您正苦于以下问题:Scala TopicPartition类的具体用法?Scala TopicPartition怎么用?Scala TopicPartition使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了TopicPartition类的12个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: ProcessingKafkaApplication
//设置package包名称以及导入依赖的类
package com.packt.chapter8
import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import scala.concurrent.duration._
object ProcessingKafkaApplication extends App {
implicit val actorSystem = ActorSystem("SimpleStream")
implicit val actorMaterializer = ActorMaterializer()
val bootstrapServers = "localhost:9092"
val kafkaTopic = "akka_streams_topic"
val partition = 0
val subscription = Subscriptions.assignment(new TopicPartition(kafkaTopic, partition))
val consumerSettings = ConsumerSettings(actorSystem, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId("akka_streams_group")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val producerSettings = ProducerSettings(actorSystem, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
val runnableGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val tickSource = Source.tick(0 seconds, 5 seconds, "Hello from Akka Streams using Kafka!")
val kafkaSource = Consumer.plainSource(consumerSettings, subscription)
val kafkaSink = Producer.plainSink(producerSettings)
val printlnSink = Sink.foreach(println)
val mapToProducerRecord = Flow[String].map(elem => new ProducerRecord[Array[Byte], String](kafkaTopic, elem))
val mapFromConsumerRecord = Flow[ConsumerRecord[Array[Byte], String]].map(record => record.value())
tickSource ~> mapToProducerRecord ~> kafkaSink
kafkaSource ~> mapFromConsumerRecord ~> printlnSink
ClosedShape
})
runnableGraph.run()
}
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:51,代码来源:ProcessingKafkaApplication.scala
示例2: TopicHandler
//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.kafka
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.hpi.esb.util.OffsetManagement
case class TopicHandler(topicName: String, numberOfMessages: Long, topicSource: Source[ConsumerRecord[String, String], Consumer.Control])
object TopicHandler {
def create(topicName: String, system: ActorSystem): TopicHandler = {
val uuid = java.util.UUID.randomUUID.toString
val consumerSettings: ConsumerSettings[String, String] = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("192.168.30.208:9092,192.168.30.207:9092,192.168.30.141:9092")
.withGroupId(uuid)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Int.MaxValue.toString)
.withProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, Int.MaxValue.toString)
.withProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "20485000")
val partition = 0
val topicSource = createSource(consumerSettings, topicName, partition)
val numberOfMessages = OffsetManagement.getNumberOfMessages(topicName, partition)
new TopicHandler(topicName, numberOfMessages, topicSource)
}
def createSource(consumerSettings: ConsumerSettings[String, String], topicName: String, partition: Int) = {
val subscription = Subscriptions.assignmentWithOffset(
new TopicPartition(topicName, partition) -> 0L
)
Consumer.plainSource(consumerSettings, subscription)
}
}
开发者ID:BenReissaus,项目名称:EnterpriseStreamingBenchmark,代码行数:42,代码来源:TopicHandler.scala
示例3: ConsumerLoop
//设置package包名称以及导入依赖的类
package org.hpi.esb.datavalidator.consumer
import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.hpi.esb.datavalidator.config.KafkaConsumerConfig
import org.hpi.esb.datavalidator.util.Logging
import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
class ConsumerLoop(topic: String, config: KafkaConsumerConfig, results: ListBuffer[ConsumerRecord[String, String]]) extends Runnable with Logging {
private val props = createConsumerProps()
private val consumer = new KafkaConsumer(props)
initializeConsumer()
override def run(): Unit = {
var running = true
var zeroCount = 0
while (running) {
val records = consumer.poll(1000).asInstanceOf[ConsumerRecords[String, String]]
if (records.count() == 0) {
logger.debug(s"Received 0 records from Kafka.")
zeroCount += 1
if (zeroCount == 3) {
logger.debug("Received 0 records from Kafka for the third time. We assume the stream has finished and terminate.")
running = false
}
}
for (record <- records) {
results.append(record)
}
}
consumer.close()
}
private def initializeConsumer(): Unit = {
val topicPartitions = List(new TopicPartition(topic, 0))
consumer.assign(topicPartitions)
consumer.seekToBeginning(topicPartitions)
}
private def createConsumerProps(): Properties = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers)
props.put(ConsumerConfig.GROUP_ID_CONFIG, s"Validator")
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.autoCommit)
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, config.autoCommitInterval)
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, config.sessionTimeout)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.keyDeserializerClass)
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.valueDeserializerClass)
props
}
}
开发者ID:BenReissaus,项目名称:ESB-DataValidator,代码行数:62,代码来源:ConsumerLoop.scala
示例4: PlainSourceConsumerMain
//设置package包名称以及导入依赖的类
package com.example.consumer
import java.util.concurrent.atomic.AtomicLong
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
object PlainSourceConsumerMain extends App {
implicit val system = ActorSystem("PlainSourceConsumerMain")
implicit val materializer = ActorMaterializer()
//TODO: move to configuration application.conf
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("PlainSourceConsumer")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val db = new DB
db.loadOffset().foreach { fromOffset =>
val partition = 0
val subscription = Subscriptions.assignmentWithOffset(
new TopicPartition("topic1", partition) -> fromOffset
)
val done =
Consumer.plainSource(consumerSettings, subscription)
.mapAsync(1)(db.save)
.runWith(Sink.ignore)
}
}
//External Offset Storage
class DB {
private val offset = new AtomicLong(2)
def save(record: ConsumerRecord[Array[Byte], String]): Future[Done] = {
println(s"DB.save: ${record.value}")
offset.set(record.offset)
Future.successful(Done)
}
def loadOffset(): Future[Long] =
Future.successful(offset.get)
def update(data: String): Future[Done] = {
println(s"DB.update: $data")
Future.successful(Done)
}
}
开发者ID:makersu,项目名称:reactive-kafka-scala-example,代码行数:62,代码来源:PlainSourceConsumerMain.scala
示例5: ConsumerExample
//设置package包名称以及导入依赖的类
package com.malaska.kafka.training
import java.util
import java.util.{Collections, Properties}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRebalanceListener, ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
object ConsumerExample {
def main(args:Array[String]): Unit = {
val kafkaServerURL = args(0)
val kafkaServerPort = args(1)
val topic = args(2)
val props = new Properties()
props.put("bootstrap.servers", kafkaServerURL + ":" + kafkaServerPort)
props.put(ConsumerConfig.GROUP_ID_CONFIG, "TrainingConsumer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
val consumer = new KafkaConsumer[String,String](props)
val listener = new RebalanceListener
consumer.subscribe(Collections.singletonList(topic), listener)
while (true) {
val records = consumer.poll(1000)
val it = records.iterator()
while (it.hasNext) {
val record = it.next()
println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset())
}
}
}
}
class RebalanceListener extends ConsumerRebalanceListener {
override def onPartitionsAssigned(collection: util.Collection[TopicPartition]): Unit = {
print("Assigned Partitions:")
val it = collection.iterator()
while (it.hasNext) {
print(it.next().partition() + ",")
}
println
}
override def onPartitionsRevoked(collection: util.Collection[TopicPartition]): Unit = {
print("Revoked Partitions:")
val it = collection.iterator()
while (it.hasNext) {
print(it.next().partition() + ",")
}
println
}
}
开发者ID:TedBear42,项目名称:kafka_training,代码行数:62,代码来源:ConsumerExample.scala
示例6: FailedEventSpec
//设置package包名称以及导入依赖的类
package com.ovoenergy.delivery.service.kafka.process
import java.time.Clock
import com.ovoenergy.comms.model._
import com.ovoenergy.comms.model.email.ComposedEmailV2
import com.ovoenergy.delivery.service.domain._
import com.ovoenergy.delivery.service.util.ArbGenerator
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition
import org.scalacheck.Shapeless._
import org.scalatest.prop.GeneratorDrivenPropertyChecks
import org.scalatest.{FlatSpec, Matchers}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
class FailedEventSpec extends FlatSpec with Matchers with ArbGenerator with GeneratorDrivenPropertyChecks {
private implicit val clock = Clock.systemUTC()
private val composedEmail = generate[ComposedEmailV2]
private var failedEventPublished = Option.empty[FailedV2]
private val publishEvent = (failed: FailedV2) => {
failedEventPublished = Some(failed)
Future.successful(new RecordMetadata(new TopicPartition("", 1), 1l, 1l, 1l, java.lang.Long.valueOf(1), 1, 1))
}
"FailedEvent" should "process failed email" in {
FailedEvent.email(publishEvent)(composedEmail, APIGatewayUnspecifiedError(EmailGatewayError))
failedEventPublished.get.metadata.traceToken shouldBe composedEmail.metadata.traceToken
failedEventPublished.get.metadata.source shouldBe "delivery-service"
failedEventPublished.get.errorCode shouldBe APIGatewayUnspecifiedError(EmailGatewayError).errorCode
failedEventPublished.get.reason shouldBe APIGatewayUnspecifiedError(EmailGatewayError).description
failedEventPublished.get.internalMetadata shouldBe composedEmail.internalMetadata
}
}
开发者ID:ovotech,项目名称:comms-delivery-service,代码行数:39,代码来源:FailedEventSpec.scala
示例7: IssuedForDeliveryEventSpec
//设置package包名称以及导入依赖的类
package com.ovoenergy.delivery.service.kafka.process
import java.time.Clock
import com.ovoenergy.comms.model.IssuedForDeliveryV2
import com.ovoenergy.comms.model.email.ComposedEmailV2
import com.ovoenergy.delivery.service.domain.GatewayComm
import com.ovoenergy.delivery.service.util.ArbGenerator
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition
import org.scalacheck.Shapeless._
import org.scalatest.prop.GeneratorDrivenPropertyChecks
import org.scalatest.{FlatSpec, Matchers}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
class IssuedForDeliveryEventSpec extends FlatSpec with Matchers with ArbGenerator with GeneratorDrivenPropertyChecks {
private implicit val clock = Clock.systemUTC()
private val gatewayComm = generate[GatewayComm]
private val composedEmail = generate[ComposedEmailV2]
private var issuedForDeliveryEventPublished = Option.empty[IssuedForDeliveryV2]
private val publishEvent = (issuedForDelivery: IssuedForDeliveryV2) => {
issuedForDeliveryEventPublished = Some(issuedForDelivery)
Future.successful(new RecordMetadata(new TopicPartition("", 1), 1l, 1l, 1l, 1l, 1, 1))
}
"IssuedForDeliveryEvent" should "process an issued email" in {
IssuedForDeliveryEvent.email(publishEvent)(composedEmail, gatewayComm)
issuedForDeliveryEventPublished.get.metadata.traceToken shouldBe composedEmail.metadata.traceToken
issuedForDeliveryEventPublished.get.metadata.source shouldBe "delivery-service"
issuedForDeliveryEventPublished.get.gatewayMessageId shouldBe gatewayComm.id
issuedForDeliveryEventPublished.get.gateway shouldBe gatewayComm.gateway
issuedForDeliveryEventPublished.get.internalMetadata shouldBe composedEmail.internalMetadata
issuedForDeliveryEventPublished.get.channel shouldBe gatewayComm.channel
}
}
开发者ID:ovotech,项目名称:comms-delivery-service,代码行数:41,代码来源:IssuedForDeliveryEventSpec.scala
示例8: TwitterSinkTask
//设置package包名称以及导入依赖的类
package com.eneco.trading.kafka.connect.twitter
import java.util
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.connect.sink.{SinkRecord, SinkTask}
import scala.collection.JavaConverters._
import scala.util.{Success, Failure}
class TwitterSinkTask extends SinkTask with Logging {
var writer: Option[SimpleTwitterWriter] = None
override def start(props: util.Map[String, String]): Unit = {
val sinkConfig = new TwitterSinkConfig(props)
writer = Some(new TwitterWriter(
sinkConfig.getString(TwitterSinkConfig.CONSUMER_KEY_CONFIG),
sinkConfig.getPassword(TwitterSinkConfig.CONSUMER_SECRET_CONFIG).value,
sinkConfig.getString(TwitterSinkConfig.TOKEN_CONFIG),
sinkConfig.getPassword(TwitterSinkConfig.SECRET_CONFIG).value))
}
override def put(records: util.Collection[SinkRecord]): Unit =
records.asScala
.map(_.value.toString)
.map(text => (text, writer match {
case Some(writer) => writer.updateStatus(text)
case None => Failure(new IllegalStateException("twitter writer is not set"))
}))
.foreach {
case (text, result) => result match {
case Success(id) => log.info(s"successfully tweeted `${text}`; got assigned id ${id}")
case Failure(err) => log.warn(s"tweeting `${text}` failed: ${err.getMessage}")
}
}
override def stop(): Unit = {
}
override def flush(map: util.Map[TopicPartition, OffsetAndMetadata]) = {
}
override def version(): String = ""
}
开发者ID:heroku,项目名称:kafka-tweet-producer,代码行数:43,代码来源:TwitterSinkTask.scala
示例9: LoggingRebalanceListener
//设置package包名称以及导入依赖的类
package com.pagerduty.kafkaconsumer
import java.util
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
import org.apache.kafka.common.TopicPartition
import org.slf4j.Logger
import scala.collection.JavaConversions._
class LoggingRebalanceListener(topic: String, log: Logger) extends ConsumerRebalanceListener {
private var currentAssignment = Set.empty[TopicPartition]
override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
val newAssignment = partitions.toSet
val addedPartitions = newAssignment -- currentAssignment
val removedPartitions = currentAssignment -- newAssignment
currentAssignment = newAssignment
logPartitionChanges("added", addedPartitions)
logPartitionChanges("removed", removedPartitions)
logNewAssignment(newAssignment)
}
override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {}
protected def logPartitionChanges(changeVerb: String, delta: Set[TopicPartition]): Unit = {
if (delta.nonEmpty) {
val partitioningInfo = getTopicPartitioningInfo(delta)
log.info(s"Partitions $changeVerb: topic=${topic}, partitions=[$partitioningInfo]")
}
}
protected def logNewAssignment(partitions: Set[TopicPartition]): Unit = {}
protected def getTopicPartitioningInfo(partitions: Set[TopicPartition], maxLength: Option[Int] = None): String = {
def shorten(string: String, length: Int) = {
if (string.length > length) string.take(length - 3) + "..." else string
}
val orderedPartitions = getOrderedPartitionsForTopic(topic, partitions)
val fullPartitioningInfo = orderedPartitions.mkString(",")
if (fullPartitioningInfo.isEmpty) "unassigned"
else if (maxLength.isDefined) shorten(fullPartitioningInfo, length = maxLength.get)
else fullPartitioningInfo
}
private def getOrderedPartitionsForTopic(topic: String, partitions: Set[TopicPartition]): Seq[Int] = {
val partitionsForTopic = partitions.collect {
case partition if partition.topic == topic =>
partition.partition
}
partitionsForTopic.toSeq.sorted
}
}
开发者ID:PagerDuty,项目名称:scala-kafka-consumer,代码行数:56,代码来源:LoggingRebalanceListener.scala
示例10: RecordCallbackSpec
//设置package包名称以及导入依赖的类
package articlestreamer.aggregator.kafka
import articlestreamer.shared.BaseSpec
import articlestreamer.shared.kafka.RecordCallback
import com.github.ghik.silencer.silent
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition
class RecordCallbackSpec extends BaseSpec {
val recordCallback = new RecordCallback
//At the moment purely for coverage purpose
"If any exception occurs" should "log it" in {
recordCallback.onCompletion(null, new RuntimeException())
}
//At the moment purely for coverage purpose
"If successfully sends record" should "log it" in {
//noinspection ScalaDeprecation
val metadata = new RecordMetadata(new TopicPartition("", 0), 0l, 1l) : @silent
recordCallback.onCompletion(metadata, null)
}
}
开发者ID:firens,项目名称:article-streamer,代码行数:27,代码来源:RecordCallbackSpec.scala
示例11: KafkaEventSourceTest
//设置package包名称以及导入依赖的类
package process
import java.util
import java.util.Collections
import kpi.twitter.analysis.utils.{PredictedStatus, SentimentLabel, TweetSerDe}
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.scalatest.FunSuite
import org.scalatest.mockito.MockitoSugar
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import twitter4j.Status
class KafkaEventSourceTest extends FunSuite with MockitoSugar {
test("subscribe should be invoked once for correct topic") {
val topicName = "fake"
val mockConsumer = mock[KafkaConsumer[SentimentLabel, Status]]
val mockTime = new MockTime
val kafkaEventSource = new KafkaEventSource(mockConsumer, topicName, mockTime)
verify(mockConsumer, times(1)).subscribe(Collections.singletonList(topicName))
}
test("poll should return on max records") {
val topicName = "fake"
val mockConsumer = mock[KafkaConsumer[SentimentLabel, Status]]
val mockTime = new MockTime
when(mockConsumer.poll(1000)).thenAnswer(new Answer[ConsumerRecords[SentimentLabel, Status]]() {
override def answer(invocation: InvocationOnMock): ConsumerRecords[SentimentLabel, Status] = {
mockTime.sleep(1)
val tp = new TopicPartition(topicName, 1)
val record = new ConsumerRecord[SentimentLabel, Status](topicName, 0, 0, mock[SentimentLabel], mock[Status])
val recordsMap = new util.HashMap[TopicPartition, util.List[ConsumerRecord[SentimentLabel, Status]]]()
val recordsList = new util.ArrayList[ConsumerRecord[SentimentLabel, Status]]()
recordsList.add(record)
recordsMap.put(tp, recordsList)
new ConsumerRecords[SentimentLabel, Status](recordsMap)
}
})
val kafkaEventSource = new KafkaEventSource(mockConsumer, topicName, mockTime)
val records = kafkaEventSource.poll(1000, 1)
assert(1 === records.size)
assert(1 === mockTime.currentMillis)
}
}
开发者ID:GRpro,项目名称:TwitterAnalytics,代码行数:58,代码来源:KafkaEventSourceTest.scala
示例12: ConsumersUtils
//设置package包名称以及导入依赖的类
package com.landoop.kafka.ws.utils
import com.landoop.kafka.ws.KafkaWSContext
import kafka.admin.AdminClient
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.TopicPartition
object ConsumersUtils {
def getOffsetsFor(consumerGroup: String, topic: String)(implicit context: KafkaWSContext): Map[Int, Long] = {
val adminClient = AdminClient.create(Map(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> context.config.brokers))
try {
val offsets = adminClient.listGroupOffsets(consumerGroup)
offsets.filter { case (k, v) =>
k.topic() == topic
}.map { case (k, v) => k.partition() -> v }
} finally {
adminClient.close
}
}
def getOffsets(consumerGroup: String)(implicit context: KafkaWSContext): Map[TopicPartition, Long] = {
val adminClient = AdminClient.create(Map(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> context.config.brokers))
try {
adminClient.listGroupOffsets(consumerGroup)
} finally {
adminClient.close()
}
}
}
开发者ID:Landoop,项目名称:kafka-ws,代码行数:31,代码来源:ConsumersUtils.scala
注:本文中的org.apache.kafka.common.TopicPartition类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论