lovoo/goka

Stopping simultaneously fails

frairon opened this issue · 0 comments

In most cases, when multiple processors in the same application (system test, in this case), are shut down at the same time by closing the runner-ctx - one of them fails shutting down.

this unit test reproduces the behavior:

func TestManyProcs(t *testing.T) {
	var (
		group       goka.Group = goka.Group(fmt.Sprintf("%s-%d", "goka-systemtest-manyprocs", time.Now().Unix()))
		inputStream string     = string(group) + "-input"
		joinTable   goka.Table = goka.Table(fmt.Sprintf("%s-join", group))
	)

	brokers := initSystemTest(t)

	tmc := goka.NewTopicManagerConfig()
	tmc.Table.Replication = 1
	cfg := goka.DefaultConfig()
	tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
	require.NoError(t, err)

	err = tm.EnsureStreamExists(inputStream, 20)
	require.NoError(t, err)
	err = tm.EnsureTableExists(string(joinTable), 20)
	require.NoError(t, err)

	require.NoError(t, tm.Close())

	time.Sleep(1 * time.Second)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	errg, ctx := multierr.NewErrGroup(ctx)
	var procs []*goka.Processor

	inputEmitter, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.String))
	require.NoError(t, err)
	defer inputEmitter.Finish()
	joinEmitter, err := goka.NewEmitter(brokers, goka.Stream(joinTable), new(codec.String))
	require.NoError(t, err)
	defer joinEmitter.Finish()
	errg.Go(func() error {
		for i := 0; ; i++ {
			select {
			case <-ctx.Done():
				return nil
			default:
			}
			require.NoError(t, inputEmitter.EmitSync(fmt.Sprintf("key-%d", i), "value"))
			require.NoError(t, joinEmitter.EmitSync(fmt.Sprintf("key-%d", i), "value"))
			time.Sleep(10 * time.Millisecond)
		}
	})

	createAndRunProc := func(id int) {
		proc, err := goka.NewProcessor(brokers,
			goka.DefineGroup(
				group,
				goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) {
					ctx.SetValue(msg)
					log.Printf("messagage")
				}),
				goka.Join(joinTable, new(codec.String)),
				goka.Persist(new(codec.String)),
			),
			goka.WithHotStandby(),
			goka.WithStorageBuilder(storage.MemoryBuilder()),
		)
		require.NoError(t, err)

		errg.Go(func() error {
			log.Printf("Starting processor %d", id)
			err := proc.Run(ctx)
			if err != nil {
				return fmt.Errorf("error running proc %d: %w", id, err)
			}
			return nil
		})
		procs = append(procs, proc)
	}

	for i := 0; i < 20; i++ {
		createAndRunProc(i)
	}

	pollTimed(t, "procs 1&2 recovered", 25.0, func() bool {
		for _, proc := range procs {
			if !proc.Recovered() {
				return false
			}
		}
		return true
	})

	time.Sleep(5 * time.Second)

	cancel()
	require.NoError(t, errg.Wait().ErrorOrNil())
}

In production, this will usually not occur - but it still can. First analysis showed that the reason is the final CommitOffset when the ConsumerGroup is shutting down. One of the instances fails, maybe it is connected to being leader.