Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
725 views
in Technique[技术] by (71.8m points)

How to Handle a Kafka Record with a Class-Level @KafkaListener with no @KafkaHandler for the Record Value

Normally, when we define a class-level @KafkaListener and method level @KafkaHandlers, we can define a default @KafkaHandler to handle unexpected payloads.

https://docs.spring.io/spring-kafka/docs/current/reference/html/#class-level-kafkalistener

But, what should we do if we don't have a default method?

question from:https://stackoverflow.com/questions/65943168/how-to-handle-a-kafka-record-with-a-class-level-kafkalistener-with-no-kafkahan

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

With version 2.6 and later, you can configure a SeekToCurrentErrorHandler to immediately send such messages to a dead letter topic, by examining the exception.

Here is a simple Spring Boot application that demonstrates the technique:

@SpringBootApplication
public class So59256214Application {

    public static void main(String[] args) {
        SpringApplication.run(So59256214Application.class, args);
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("so59256214").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("so59256214.DLT").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "so59256214.DLT", topics = "so59256214.DLT")
    void listen(ConsumerRecord<?, ?> in) {
        System.out.println("dlt: " + in);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, Object> template) {
        return args -> {
            template.send("so59256214", 42);
            template.send("so59256214", 42.0);
            template.send("so59256214", "No handler for this");
        };
    }

    @Bean
    ErrorHandler eh(KafkaOperations<String, Object> template) {
        SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template));
        BackOff neverRetryOrBackOff = new FixedBackOff(0L, 0);
        BackOff normalBackOff = new FixedBackOff(2000L, 3);
        eh.setBackOffFunction((rec, ex) -> {
            if (ex.getMessage().contains("No method found for class")) {
                return neverRetryOrBackOff;
            }
            else {
                return normalBackOff;
            }
        });
        return eh;
    }

}

@Component
@KafkaListener(id = "so59256214", topics = "so59256214")
class Listener {

    @KafkaHandler
    void integerHandler(Integer in) {
        System.out.println("int: " + in);
    }

    @KafkaHandler
    void doubleHandler(Double in) {
        System.out.println("double: " + in);
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

Result:

int: 42
double: 42.0
dlt: ConsumerRecord(topic = so59256214.DLT, ...

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...