I am trying to calculate the rate of incoming events per minute from a Kafka topic based on event time. I am using TumblingEventTimeWindows of 1 minute for this. The code snippet is given below.
I have observed that if I am not receiving any event for a particular window, e.g. from 2.34 to 2.35, then the previous window of 2.33 to 2.34 does not get closed. I understand the risk of losing data for the window of 2.33 to 2.34 (may happen due to system failure, bigger Kafka lag, etc.), but I cannot wait indefinitely. I need to close this window after waiting for a certain period of time, and subsequent windows can continue after the system recovers. How can I achieve this?
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3,
org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)
));
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
executionEnvironment.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "AllEventCountConsumerGroup");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("event_input_topic", new SimpleStringSchema(), properties);
DataStreamSource<String> kafkaDataStream = environment.addSource(kafkaConsumer);
kafkaDataStream
.flatMap(new EventFlatter())
.filter(Objects::nonNull)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Entity>forMonotonousTimestamps()
.withIdleness(Duration.ofSeconds(60))
.withTimestampAssigner((SerializableTimestampAssigner<Entity>) (element, recordTimestamp) -> element.getTimestamp()))
.assignTimestampsAndWatermarks(new EntityWatermarkStrategy())
.keyBy((KeySelector<Entity, String>) Entity::getTenant)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.seconds(10))
.aggregate(new EventCountAggregator())
.addSink(eventRateProducer);
private static class EntityWatermarkStrategy implements WatermarkStrategy<Entity> {
@Override
public WatermarkGenerator<Entity> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new EntityWatermarkGenerator();
}
}
private static class EntityWatermarkGenerator implements WatermarkGenerator<Entity> {
private long maxTimestamp;
public EntityWatermarkGenerator() {
this.maxTimestamp = Long.MIN_VALUE + 1;
}
@Override
public void onEvent(Entity event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp + 2));
}
}
Also, I tried adding some custom triggers, but it didn't help. I am using Apache Flink 1.11
Can somebody suggest, what wrong am I doing?
When I tried to push some more data with the newer timestamp (say t+1) of a topic, data from an earlier timeframe (t) gets pushed. but again for t+1 data, the same issues occur as of t.
question from:
https://stackoverflow.com/questions/65559601/apache-flink-does-not-return-data-for-idle-partitions 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…