I wish to write a code in which I can skip the processing of code in this cycle but can do it in next batch. Eg code looks like this
stream.transform {
//extract offset ranges to be committed after the processing is finished
rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
if (offsetRanges != null) {
log.info(s"Starting to process offset ranges ${offsetRanges.foreach(or => log.info(or.toString()))}")
}
rdd
}.map(record => record.value)
messages.foreachRDD { rdd => {
if(cond)
processAndCommit(rdd)
}
}
Now while on the basis of cond, rdd will be processed. if cond is false, rdd will not be processed but internally batch will be processed and newer offsets will be processed in next cycle. I want the prev offsets as well to be picked in next cycle to process. Is there any way to achieve that?
Illustration:
Batch 1 :
offset range 1->5
cond false
processAndCommit block will be skipped and offset 1->5 will not be processed
Batch 2:
offset range 6-> 10
cond true
processAndCommit block will be executed and offsets 6->10 will be processed.
I want to pick offset range 1->10 in next cycle and not 6->10.
question from:
https://stackoverflow.com/questions/65845075/need-to-skip-the-batch-processing-for-this-cycle-but-do-that-in-next-one-in-spar 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…