Here's my code. My question is as follows
Is it correct to clear state in this way?
Is this the correct way to use keyBy ?
//There are 1000,000 + storeId
orderStream.keyBy(Order::getStoreId) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(16))) .trigger(ContinuousEventTimeTrigger.of(Time.seconds(1))) .evictor(TimeEvictor.of(Time.seconds(0), true)) .process(new ProcessWindowFunction<Order, Object, Long, TimeWindow>() { MapState<Long, Long> storeCountState; @Override public void process(Long storeId, Context context, Iterable<Order> elements, Collector<Object> out) throws Exception { long sum = 0L; for (Order element : elements) { sum++; } storeCountState.put(storeId, storeCountState.get(storeId) + sum); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); MapStateDescriptor<Long, Long> mapStateDescriptor = new MapStateDescriptor(); storeCountState = getRuntimeContext().getMapState(mapStateDescriptor); } @Override public void close() throws Exception { super.close(); // I clear state when each window close storeCountState.clear(); } }) .addSink(new PrintSinkFunction<>());
2.1m questions
2.1m answers
60 comments
57.0k users