Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
312 views
in Technique[技术] by (71.8m points)

Spring cloud kafka streams binder failing serialization with more than one function in the same application

I have been trying spring cloud stream with kafka streams and I am facing an issue and struggling to find the root cause.

Basically I have the following topology: -> (branching events per type) -> -> (aggregate plays per id & time window) ->

In order to achieve this with spring cloud stream, I have two different Functions.

@Bean
 public Function<KStream<Integer, String>, KStream<Integer, String>[]> processVideoEvents() {
@Bean
 public Function<KStream<Integer, String>, KStream<Integer, WindowedVideoViewsCountMetric>> processVideoViewEvents() {
spring:
  kafka:
    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
  cloud:
    stream:
      function:
        definition: processVideoEvents;processVideoViewEvents
      bindings:

        processVideoEvents-in-0:
          destination: video.video.video-events.json
        processVideoEvents-out-0:
          destination: video.video.video-view-events.json

        processVideoViewEvents-in-0:
          destination: video.video.video-view-events.json
        processVideoViewEvents-out-0:
          destination: video.video.video-views-count.json

If I just enable one of them, everything works fine.. however, when I enable both of them to build the data pipeline that I want to achieve, I get the following error:

org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic video.video.video-view-events.json for task 0_0 due to:
org.apache.kafka.common.errors.SerializationException: Can't serialize data [{"referer":"https://preview.xyz.com/","device":null,"app":"web","appVersion":null,"extraFields":null,"version":"0.1.0","event_type":"active_view","user_id":0,"user_urn":"urn:xyz:users:user:0","video_id":1,"video_urn":"urn:xyz:videos:video:4","account_key":"<some key>","site_section":"homepage","user_agent":"PostmanRuntime/7.26.1","client_timestamp":"2000-01-01 00:00:00.000 +0000","server_timestamp":"2020-07-08 14:40:30.549 +0000"}] for topic [video.video.video-view-events.json]
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:167) ~[kafka-streams-2.6.0.jar:na]
    ...
Caused by: org.apache.kafka.common.errors.SerializationException: Can't serialize data [{"referer":"https://preview.xing.com/","device":null,"app":"web","appVersion":null,"extraFields":null,"version":"0.1.0","event_type":"active_view","user_id":0,"user_urn":"urn:x-xing:users:user:0","video_id":1,"video_urn":"urn:x-xing:videos:video:4","account_key":"52a3176c8ae6e7af0099476aeccf4de0","site_section":"startpage","user_agent":"PostmanRuntime/7.26.1","client_timestamp":"2000-01-01 00:00:00.000 +0000","server_timestamp":"2020-07-08 14:40:30.549 +0000"}] for topic [video.video.video-view-events.json]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Incompatible types: declared root type ([simple type, class com.xyz.video.model.video.metrics.count.WindowedVideoViewsCountMetric]) vs `java.lang.String`

I don't get it why spring is trying to serialize the json string to the output param of the second function when the input param of that function is expecting a KStream<Integer, String>.

Any idea?

question from:https://stackoverflow.com/questions/65917206/spring-cloud-kafka-streams-binder-failing-serialization-with-more-than-one-funct

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...