Yeah, this is quite a weird little catch-22, but the documentation is correct. The Processor for a global state store must not do anything to the records but persist them into the store.
AFAIK, this isn't a philosophical issue, just a practical one. The reason is simply the behavior you observe... Streams treats the input topic as a changelog topic for the store and therefore bypasses the processor (as well as deserialization) during restoration.
The reason that state restoration bypasses any processing is that usually the data in a changelog is identical to the data in the store, so it would actually be wrong to do anything new to it. Plus, it's more efficient just to take the bytes off the wire and bulk-write them into the state stores. I say "usually" because in this case, the input topic isn't exactly like a normal changelog topic, in that it doesn't receive its writes during store puts.
For what it's worth, I also struggle to understand the use case. Seemingly, we should either:
- Get rid of that processor entirely, and always just dump the binary data off the wire into the stores, just like restoration does.
- Re-design global stores to allow arbitrary transformations before the global store. We could either:
- continue to use the input topic and deserialize and invoke the processors during restoration as well, OR
- add a real changelog for global stores, such that we'd poll the input topic, apply some transformations, then write to the global store and the global-store-changelog. Then, we can use the changelog (not the input) for restoration and replication.
By the way, if you want the latter behavior, you can approximate it right now by applying your transformations and then using to(my-global-changelog)
to manufacture a "changelog" topic. Then, you'd create the global store to read from your my-global-changelog
instead of the input.
So, to give you a direct answer, KAFKA-7663 is not a bug. I'll comment on the ticket proposing to turn it into a feature request.
Bonus answer: Topics that act as changelogs for state stores must not be configured with retention. Practically speaking, this means you should prevent infinite growth by enabling compaction, and disable log retention.
In practice, old data falling out of retention and getting dropped is not an "event", and consumers have no way of knowing if/when it happens. Therefore, it's not possible to remove data from the state stores in response to this non-event. It would happen as you describe... the records would just sit there in the global store indefinitely. If/when an instance is replaced, the new one would restore from the input and (obviously) only receive records that exist in the topic at that time. Thus, the Streams cluster as a whole would wind up with an inconsistent view of the global state. That's why you should disable retention.
The right way to "drop" old data from the store would be to just write a tombstone for the desired key into the input topic. This would then be correctly propagated to all members of the cluster, applied correctly during restoration, AND correctly compacted by the brokers.
I hope this all helps. Definitely, please chime in on the ticket and help us shape the API to be more intuitive!