modernice/goes

Projection finalizers

Closed this issue · 1 comments

Currently, projection finalization is implemented like this:

package example

type Foo struct { ... }

func (*Foo) ApplyEvent(event.Event) {}

func (f *Foo) finalize(ctx context.Context, dep SomeDependency) error {
  // do stuff
  if err := dep.Do(ctx, "..."); err != nil {
    return err
  }
  // do more stuff
  return nil
}

func example(s projection.Schedule) {
  var dep SomeDependency

  s.Subscribe(context.TODO(), func(ctx projection.Job) error {
    refs, errs, err := ctx.Aggregates(ctx)
    if err != nil {
      return fmt.Errorf("extract aggregates: %w", err)
    }

    return streams.Walk(ctx, func(ref aggregate.Ref) error {
      foo := NewFoo(ref.ID)

      if err := ctx.Apply(ctx, foo); err != nil {
        return err
      }

      return foo.finalize(ctx, dep)
    }, refs, errs)
  })
}

Finalization is done for each individual projection after the job applies its events. A nice addition would be if finalization could be batched and deferred to the end of the projection job, like this:

package example

func example(s projection.Schedule) {
  var dep SomeDependency

  s.Subscribe(context.TODO(), func(ctx projection.Job) error {
    refs, errs, err := ctx.Aggregates(ctx)
    if err != nil {
      return fmt.Errorf("extract aggregates: %w", err)
    }

    return streams.Walk(ctx, func(ref aggregate.Ref) error {
      foo := NewFoo(ref.ID)

      if err := ctx.Apply(ctx, foo); err != nil {
        return err
      }

      return ctx.Defer(func() error {
        // this call will be deferred to after this projection update
        return foo.finalize(ctx, dep)
      })
    }, refs, errs)
  })
}

Can already be implemented in user-land code pretty easily.