I'm setting up a slow-changing lookup Map in my Apache-Beam pipeline. It continuously updates the lookup map. For each key in lookup map, I retrieve the latest value in the global window with accumulating mode.
But it always meets Exception :
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Duplicate values for mykey
Is anything wrong with this snippet code?
If I use .discardingFiredPanes()
instead, I will lose information in the last emit.
pipeline
.apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(1L)))
.apply(
Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()))
.accumulatingFiredPanes())
.apply(new ReadSlowChangingTable())
.apply(Latest.perKey())
.apply(View.asMap());
Example Input Trigger:
t1 : KV<k1,v1> KV< k2,v2>
t2 : KV<k1,v1>
accumulatingFiredPanes
=> expected result at t2 => KV(k1,v1), KV(k2,v2) but failed due to duplicated exception
discardingFiredPanes
=> expected result at t2 => KV(k1,v1) Success
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…