Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
408 views
in Technique[技术] by (71.8m points)

java - kafka consumer to dynamically detect topics added

I'm using KafkaConsumer to consume messages from Kafka server (topics)..

  • It works fine for topics created before starting Consumer code...

But the problem is, it will not work if the topics created dynamically(i mean to say after consumer code started), but the API says it will support dynamic topic creation.. Here is the link for your reference..

Kafka version used : 0.9.0.1

https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

Here is the JAVA code...

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Pattern r = Pattern.compile("siddu(\d)*");

    consumer.subscribe(r, new HandleRebalance());
    try {
         while(true) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(partition.partition()  + ": "  +record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
       consumer.close();
     }

NOTE: My topic names are matching the Regular Expression.. And if i restart the consumer then it will start reading messages pushed to topic...

Any help is really appreciated...

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

There was an answer to this in the apache kafka mail archives. I am copying it below:

The consumer supports a configuration option "metadata.max.age.ms" which basically controls how often topic metadata is fetched. By default, this is set fairly high (5 minutes), which means it will take up to 5 minutes to discover new topics matching your regular expression. You can set this lower to discover topics quicker.

So in your props you can:

props.put("metadata.max.age.ms", 5000);

This will cause your consumer to find out about new topics every 5 seconds.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...