lovoo/goka

[Production Usage] Question: How can I modify the state of the whole table triggered by an input message?

Opened this issue · 1 comments

We are using Goka in production and have encountered a scenario where we need to clean up the state in a table topic when processing a specific message from our input topic. The use case involves two types of messages:

  • Data Packet: This message is used to update the state for a specific key, typically by appending new information to the existing state. For example: { Key: user, Value: data } => Goka Processor => { Key: user, Value: []data{data1, data2, data3} }

  • Delete Message: This message requires us to remove data across many keys.

The challenge we’re facing is as follows:

  • We want to maintain at-least-once processing semantics.

  • Upon receiving the delete message, we need to iterate over the entire table (preferably for that partition) and remove the relevant data from each key’s state. Then, we update the state for those keys.

  • We attempted to use the experimental visitor + VisitAll function, but it seems this approach doesn't work as expected. The VisitAll function generates internal events that are not processed until the processor releases the thread for handling the delete message. This creates a problem, as we need to read the delete message, update all affected states, and only then commit the offset for the delete message to ensure proper at-least-once semantics.

What would be the recommended approach to handle this?

I have this use case as well.

One difficulty I've encountered using p.VisitAll to perform cleanup upon receiving the kind of multi-key cleanup message @akshatraika-moment has described is that we don't want to block the process callback/partition processor until p.VisitAll returns, but we also don't want to commit the cleanup message's offset until p.VisitAll returns.

If we block the processor, we find ourselves in a deadlock. p.VisitAll does not return until all of the visit events have been processed, but the some of the visit events will never be processed because one of the partition processors is blocked on p.VisitAll.

But if we don't block the processor callback/partition processor, then we lose at least once semantics. If the processor shuts down or rebalances after the multi-key cleanup message's offset is committed but before we've finished cleaning up, some of the visit events are lost forever.

We've considered using ctx.DeferCommit to maintain at least once semantics, but are concerned we would build up a large queue of uncommitted offsets. The idea would be to wait for p.VisitAll to return before committing any offsets after receiving a multi-key cleanup message. The concern about building up a large queue of uncommitted offsets comes from the fact that we have high throughput and a lot of unique keys on our input topic.

We would greatly appreciate any advice! Thank you.