Stopping simultaneously fails
frairon opened this issue · 0 comments
frairon commented
In most cases, when multiple processors in the same application (system test, in this case), are shut down at the same time by closing the runner-ctx
- one of them fails shutting down.
this unit test reproduces the behavior:
func TestManyProcs(t *testing.T) {
var (
group goka.Group = goka.Group(fmt.Sprintf("%s-%d", "goka-systemtest-manyprocs", time.Now().Unix()))
inputStream string = string(group) + "-input"
joinTable goka.Table = goka.Table(fmt.Sprintf("%s-join", group))
)
brokers := initSystemTest(t)
tmc := goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
cfg := goka.DefaultConfig()
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
require.NoError(t, err)
err = tm.EnsureStreamExists(inputStream, 20)
require.NoError(t, err)
err = tm.EnsureTableExists(string(joinTable), 20)
require.NoError(t, err)
require.NoError(t, tm.Close())
time.Sleep(1 * time.Second)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errg, ctx := multierr.NewErrGroup(ctx)
var procs []*goka.Processor
inputEmitter, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.String))
require.NoError(t, err)
defer inputEmitter.Finish()
joinEmitter, err := goka.NewEmitter(brokers, goka.Stream(joinTable), new(codec.String))
require.NoError(t, err)
defer joinEmitter.Finish()
errg.Go(func() error {
for i := 0; ; i++ {
select {
case <-ctx.Done():
return nil
default:
}
require.NoError(t, inputEmitter.EmitSync(fmt.Sprintf("key-%d", i), "value"))
require.NoError(t, joinEmitter.EmitSync(fmt.Sprintf("key-%d", i), "value"))
time.Sleep(10 * time.Millisecond)
}
})
createAndRunProc := func(id int) {
proc, err := goka.NewProcessor(brokers,
goka.DefineGroup(
group,
goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) {
ctx.SetValue(msg)
log.Printf("messagage")
}),
goka.Join(joinTable, new(codec.String)),
goka.Persist(new(codec.String)),
),
goka.WithHotStandby(),
goka.WithStorageBuilder(storage.MemoryBuilder()),
)
require.NoError(t, err)
errg.Go(func() error {
log.Printf("Starting processor %d", id)
err := proc.Run(ctx)
if err != nil {
return fmt.Errorf("error running proc %d: %w", id, err)
}
return nil
})
procs = append(procs, proc)
}
for i := 0; i < 20; i++ {
createAndRunProc(i)
}
pollTimed(t, "procs 1&2 recovered", 25.0, func() bool {
for _, proc := range procs {
if !proc.Recovered() {
return false
}
}
return true
})
time.Sleep(5 * time.Second)
cancel()
require.NoError(t, errg.Wait().ErrorOrNil())
}
In production, this will usually not occur - but it still can. First analysis showed that the reason is the final CommitOffset
when the ConsumerGroup
is shutting down. One of the instances fails, maybe it is connected to being leader.