I have topology, that does following:
- reads from source topic
- updates record header with current timestamp header
- accumulates incoming records in state store, until some limit is reached, and once reached floods downstream topology with batch of received messages
- sends records to sink topic
So it looks something like this
sourceTopic ->
transformer (context.headers.add("timeStamp", System.currentTimeMillis())) ->
transformer (
stateStore.put(key, value)
if(stateStore.approximateNumEntries() >= limit){
stateStore.all().forEach(keyValue -> processorContext.forward(keyValue.key, keyValue.value))
}) ->
sinkTopic
The problem is, that after second transformer (that accumulates events) header, that was added in first transformer is not populated. I have custom implementation of ProductionInterceptor, that relies on this header and it throws NullPointerException
@Override
public ProducerRecord<byte[], byte[]> onSend(ProducerRecord<byte[], byte[]> record) {
long currentTime = System.currentTimeMillis();
byte[] startTimestampByte =
record.headers().lastHeader(timeStamp).value();
long startTimestampLong = ByteBuffer.wrap(startTimestampByte).getLong();
log.info("Duration of event processing {}", System.currentTimeMillis()- startTimestampLong);
}
return record;
}
So question is, is there any way to propagate custom headers via calling ProcessorContext.forward? Thanks in advance.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…