It looks like a relatively new feature in Camel to support Kafka manual commits. And the documentation wasn't particularly clear. I'm using Camel 2.22.1.
From the description of your problem, you are looking for "at least once" semantics. That is you want to be able to re-process a message when there was an issue. Of course the result of this approach is that no other messages in the partition w/ a failing message can be processed (or seen) until the application can successfully process it. In the case of a failing service, this would likely result in all partitions for a given topic being blocked until the service is back up.
The Kafka uri to get this to work would look like this:
kafka:TestLog?brokers=localhost:9092&groupId=kafkaGroup&maxPollRecords=3&consumersCount=1&autoOffsetReset=earliest&autoCommitEnable=false&allowManualCommit=true&breakOnFirstError=true
Breaking that down a bit:
kafka:TestLog
: specifies the Kafka topic to consume from
brokers=localhost:9092
: specifies the bootstrap servers for Kafka cluster
groupId=kafkaGroup
: specifies the Kafka consumer group
consumersCount=1
: specifies the number of Kafka consumers for that Camel route
The last two configuration settings are important when consuming from a Kafka topic with a number of partitions. They need to be tuned/configured so that they are taking into account the number of Camel instances you are planning to run.
The more interesting configuration to get to "at least once" semantics:
autoCommitEnable=false
: turn off auto committing of offsets so we can use manual commits.
allowManualCommit=true
: turn on manual commits, giving us access to the KafkaManualCommit
capability (see code below).
breakOnFirstError=true
: when this is true, the route will stop processing the rest of the messages in the batch received on last poll of the topic.
maxPollRecords=3
: specifies the number of messages consumed during a single poll of the Kafka topic.It is probably is a good idea to keep this set to a low number, since issues w/ a message in the batch would cause all of the messages in the batch to be re-processed.
autoOffsetReset=earliest
: will cause the consumer to read from the earliest offset when there is a difference between the current offset and the offset marking the end of the partition (more on that in a bit).
The Camel route would look something like this:
from(kafkaUrl)
.routeId("consumeFromKafka")
.process(exchange -> {
LOGGER.info(this.dumpKafkaDetails(exchange));
})
.process(exchange -> {
// do something
})
.process(exchange -> {
// do something else
})
.process(exchange -> {
exchange.setProperty(Exchange.FILE_NAME, UUID.randomUUID().toString() + ".txt");
})
.to("file://files")
// at the end of the route
// manage the manual commit
.process(exchange -> {
// manually commit offset if it is last message in batch
Boolean lastOne = exchange.getIn().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, Boolean.class);
if (lastOne) {
KafkaManualCommit manual =
exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
if (manual != null) {
LOGGER.info("manually committing the offset for batch");
manual.commitSync();
}
} else {
LOGGER.info("NOT time to commit the offset yet");
}
});
After running this route and getting an error you can see the state of the consumer group with this command:
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group kafkaGroup --describe
that might yield this result:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
TestLog 0 92 95 3
This is where the autoOffsetReset
setting comes into play. The current offset is where the consumer group wants to consume from. If that offset (92) is the error message, then the group will fall behind as more messages (in this case two more) are added. The route (using the given settings) will cause Camel to continually process the message at offset 92 until it succeeds. If the Camel route is stopped and started, the application would pick up consuming from the earliest
offset (the 92) and not the latest
which would be 95 based on autoOffsetReset
. Using latest
would result in "lost" messages, because a restart of Camel would start processing using latest offset.
A sample application is available here
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…