I am new to Kafka and writing a Cron job in Spring boot that validates some records in SQL vs Kafka topic. The job needs to run once a day in the morning. I have set the job to run after every 15th minute for my testing and it works as expected. But as soon as I updated the cron to run the job after every 2 hours I am getting records from topic that are already read/process and duplicate by the consumer. I am committing the offset manually with commitAsync. For Example I have sent 3 records to the topic but the consumer is getting more than 5k records mostly duplicates.
Following is the code of the consumer and its properties.
public Map<String, Object> getKafkaConsumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10000");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "cronKafkaConsumer");
return props;
}
public List<CostKafkaModel> consumeCosts() {
KafkaConsumer<String, CostKafkaModel> consumer = new KafkaConsumer<>(
getKafkaConsumerProps(), new StringDeserializer(),
new JsonDeserializer<>(CostKafkaModel.class));
List<CostKafkaModel> kafkaModelList = new ArrayList<>();
try {
consumer.subscribe(Arrays.asList("deltaCosts"));
ConsumerRecords<String, CostKafkaModel> records = consumer
.poll(1000);
for (ConsumerRecord<String, CostKafkaModel> record : records) {
kafkaModelList.add(record.value());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.commitSync();
consumer.close();
}
return kafkaModelList;
}
Any help would be appriciated.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…