Is there a way to get a list of the keys associated with a processor instance?
oliviabarnett opened this issue · 2 comments
goka newbie here --
I have a goka setup working that allows me to batch actions coming in under a certain key and submit if the length of actions in the batch exceeds 10, but I also want to add a timer functionality that runs in a loop externally and dumps the batch of actions every 30 seconds. I think I can make this happen with the use of a control message like this:
func (uq *userActionQueue) userActionsProcessing(ctx goka.Context, msg any) {
if err != nil {
return
}
batch := uq.getOrCreateBatch(ctx)
// If a control message is identified, submit batch
controlMsg, ok := msg.(*ControlMessage)
if ok && controlMsg.Action == TimedSubmitBatchAction {
uq.submitBatch(ctx.Context(), batch)
batch.Clear()
ctx.SetValue(batch)
return
}
event, ok := msg.(*UserActionsEvent)
if !ok {
return
}
// Store item, check length of batch and potentially submit
...
I want to do something like this for the timer:
go func() {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
keys := []string{"?"}
for range ticker.C {
for _, key := range keys {
submitBatchEvent := ControlMessage{Action: TimedSubmitBatchAction}
err := uq.userActionEmitter.EmitSync(key, submitBatchEvent)
if err != nil {
return
}
}
}
}()
but obviously, this requires me to have a list of the keys for which this processor instance is responsible. I know I can get information about the keys from a view using the group table topic, but won't that give me all of the keys across all processors? Clearly the processor knows which keys it is responsible for because it is able to load up that local state after a crash, so I am wondering if I can get access to this list somehow? But I could also be misunderstanding how this works.
Hey Olivia,
welcome to goka :). Since goka is quite coupled to Kafka, maybe some background on this. Kafka topics divides the data into partitions. When goka starts a new Processor
, this represents a Group Consumer for Kafka, i.e. all members of it will split the partitions and Kafka will take care of that distribution. In a goka View
however, we just create a consumer per partition that dump all messages to disk locally so they can be used for whatever is necessary.
So you're right, we can't know which processor-instance is responsible for which key. But if you are iterating over a View
all keys will be included. If those keys are then sent via an Emitter
to the Processor, they will end up in the correct partition and the processor instance responsible for this partition will process it.
What you're implementing is essentially a count-based windowing-mechanism, right? Just note that goka is currently not supporting this very well so you have to fall back to iteration-based solutions via views etc. The problem with those is that they do not scale very well (e.g. iterating over couple of million entries just takes too long) and second, that they will introduce all kinds of race conditions and duplications etc. But maybe it's fine for your usecase - just wanted to point that out.
Hope that helps, sorry for the delay!
Hey! Thanks for the response. Ok, got it -- this makes sense. The duplicate message issue is something we are trying to design around in a reliable and scalable way. For now, I think I can find a work around by having the timer live in a singleton instance, but definitely something I would want if support for if this ever does become available through goka!