modernice/goes

Review projection APIs

Closed this issue · 3 comments

This issue will determine if the different projection APIs are even necessary to solve common problems or if there are better solutions to these problems that don't require these APIs.

HistoryDependent

The HistoryDependent API is used by projection jobs to determine which events a projection needs to properly update itself. If a projection implements HistoryDependent, it can hint to a projection job that it requires the full history of the events that are configured in the job, instead of just the (published) events that triggered the job.

ProgressAware

The ProgressAware API is usually used by persistent projections that are stored in a database. When a projection job is triggered, the application fetches the current projection state from the database, applies the events on it and saves it back to the database. A ProgressAware projection keeps track of the last applied event in terms of the event time, to ensure that no event is applied twice to the projection.

HistoryDependent – Scenario: Product Listing

Given an ecommerce app with two product events: Created (product was created with a name) and Priced (product was given a price). The app projects a product listing read-model from these two events, which are applied as follows:

  • A Created event adds a product (name) to the listing but doesn't add any price data.
  • A Priced event adds price data to a product in the listing.

Code Example

package product

const (
  Created = "product.created"
  Priced = "product.priced"
)

type Product struct { *aggregate.Base }

// Domain logic
func (p *Product) Create(name string) error { ... }
func (p *Product) SetPrice(price uint64) error { ... }

type Listing struct {
  mux sync.RWMutex
  products []ListingProduct
}

type ListingProduct struct {
  ID uuid.UUID
  Name string
  Price uint64
}

func (l *Listing) Products() []ListingProduct {
  l.mux.RLock()
  defer l.mux.RUnlock()

  out := make([]ListingProduct, 0, len(l.products))
  for _, p := range l.products {
    if p.Price > 0 {
      out = append(out, p)
    }
  }
  return out
}

// Project continuously projects the listing until ctx is canceled.
func (l *Listing) Project(ctx context.Context, bus event.Bus, store event.Store) (<-chan error, error) {
  s := schedule.Continuously(bus, store, []string{Created, Priced})
  errs, err := s.Subscribe(ctx, func(ctx projection.Job) error {
    l.mux.Lock()
    defer l.mux.Unlock()
    return ctx.Apply(ctx, l)
  })
  if err != nil {
    return nil, err
  }

  // Manually trigger a projection job to ensure all past events are applied.
  go s.Trigger(ctx)

  return nil
}

func (l *Listing) ApplyEvent(evt event.Event) {
  switch evt.Name() {
  case Created:
    l.productCreated(evt)
  case Priced:
    l.productPriced(evt)
  }
}

func (l *Listing) productCreated(evt event.Event) {
  name := evt.Data().(string)

  productID := pick.AggregateID(evt)
  l.Products[productID] = ListingProduct{
    ID: productID,
    Name: name,
  }
}

func (l *Listing) productPriced(evt event.Event) {
  data := evt.Data().(PricedData)

  productID := pick.AggregateID(evt)

  for i, p := range l.products {
    if p.ID == productID {
      l.products[i].Price = data.Price
      return
    }
  }
}

Problem

When a Priced event is published for a product but the projection previously didn't receive the corresponding Created event of that specific product, the listing will not find the product to update the price.

Current Solutions

Using manual schedule triggers

A possible solution is to manually trigger the projection on (service) startup. Manually triggered projection jobs always fetch the entire event history of the configured events from the store. Problem with this approach is that it may cause (huge) performance problems (for the initial projection job) if the history of the requested events is large.

Using HistoryDependent API

Using the HistoryDependent API, the listing could hint to the projection job that it requires the entire event history until the first projection job has been applied, but this does effectively the same as simply triggering the schedule on startup.

package product

type Listing struct {
  ...

  requiresFullHistory bool
}

func (l *Listing) RequiresFullHistory() bool {
  return l.requiresFullHistory
}

func (l *Listing) applyJob(ctx projection.Job) error {
  // apply job ...

  // after the first job, it doesn't require the full history anymore
  l.requiresFullHistory = false
}

Better Solution

A projection job should be configurable to push additional events into an event stream if needed. Something like this should be possible:

package product

func (l *Listing) Project(ctx context.Context, bus event.Bus, store event.Store) {
  s := schedule.Continuously(bus, store, []string{Created, Priced})

  errs, err := s.Subscribe(
    context.TODO(),
    func(ctx projection.Job) error {
      l.mux.Lock()
      defer l.mux.Unlock()
      return ctx.Apply(ctx, l)
    },

    // projection.BeforeEvent is a subscription option
    projection.BeforeEvent(func(ctx context.Context, evt event.Of[uint64]) ([]event.Event, error) {
      productID := pick.AggregateID(evt)

      // If the listing already contains the product, no additional events need to be fetched.
      if l.containsProduct(productID) {
        return nil, nil
      }

      // Otherwise fetch the required `Created` event and apply it before the `Priced` event is applied.
      events, errs, err := store.Query(ctx, query.New(
        query.Name(Created),
        query.Aggregate("product", productID),
      ))

      return streams.Drain(ctx, events, errs)
    }, Priced), // event names are provided at the end to allow for intercepting different events with a single option
  )
}

Conclusion

The HistoryDependent API duplicates a feature that is already implemented in continuous schedules as manual triggers and in periodic schedules as the default behavior (period schedules always fetch the entire event history from the store). Both the manual trigger feature and HistoryDependent API lead to unnecessary fetches/queries of the entire event history (of the configured events). The proposed projection.BeforeEvent() option solves this scenario in a performant way.

-> HistoryDependent is useless.

Notes

The proposed solution could be implemented as a generalized solution for any kind of channel, which projection.BeforeEvent can use under the hood:

package example

func example(events <-chan event.Event) {
  events = streams.Before(events, func(evt event.Event) []event.Event {
    if evt.Name() == "foo" {
      return []event.Event{...}
    }
    return nil
  },)
}

ProgressAware – Scenario: Persistent Read-Model

Given the same ecommerce app as before but with an order service. A read-model is projected for each order. On service startup, all orders should be updated to get to the current/correct state, then it should continuously project the orders when order events are published.

Code Example

package order

const (
  Placed = "order.placed"
  Canceled = "order.canceled"
)

type PlacedData struct {
   Items []Item
}

type Order struct {
  *aggregate.Base
  OrderDTO
}

type OrderDTO struct {
  ID uuid.UUID
  Items []Item
}

type Item struct {
  ProductID uuid.UUID
  Quantity uint
  UnitPrice uint64
}

// Domain logic
func (o *Order) Place(items []Item) error { ... }
func (o *Order) Cancel() error { ... }

type ReadModelRepository = model.Repository[*OrderRM, uuid.UUID]

// OrderRM is the read-model of an order.
type OrderRM struct {
  *projection.Progressor
  projection.Guard

  OrderDTO
}

func NewOrderRM(id uuid.UUID) *OrderRM {
  return &OrderRM{
    Progressor: projection.NewProgressor(),

    // Only allow events of the specific order to be applied onto the read-model.
    Guard: projection.QueryGuard(query.New(query.Aggregate("order", id))),
  }
}

type Projector struct {
  orders ReadModelRepository
  schedule *schedule.Continuous
}

func NewProjector(orders ReadModelRepository, bus event.Bus, store event.Store) *Projector {
  return &Projector{
    orders: orders,
    schedule: schedule.Continuously(bus, store, []string{Placed, Canceled}),
  }
}

func (proj *OrderProjector) Run(ctx context.Context) (<-chan error, error) {
  errs, err := proj.schedule.Subscribe(ctx, proj.applyJob)
  if err != nil {
    return nil, fmt.Errorf("subscribe to projection schedule: %w", err)
  }

  // Project all orders on startup.
  go proj.schedule.Trigger(ctx)

  return errs, nil
}

func (proj *OrderProjector) applyJob(ctx projection.Job) error {
  orders, errs, err := proj.extractOrdersFromJob(ctx)
  if err != nil {
    return fmt.Errorf("extract orders from job: %w")
  }

  for _, orderID := range orders {
    if err := proj.orders.Use(ctx, orderID, func(o *OrderRM) {
      // Apply all events in the job onto the read-model.
      // The projection.QueryGuard of the read-model ensures that only the events of this specific order are applied.
      return ctx.Apply(ctx, o)
    }); err != nil {
      log.Printf("failed to project order: %v [order=%s]", err, orderID)
    }
  }
}

func (proj *Projector) extractOrdersFromJob(ctx projection.Job) ([]uuid.UUID, error) {
  str, errs, err := ctx.Aggregates(ctx)
  if err != nil {
    return nil, err
  }

  refs, err := streams.Drain(ctx, str, errs)
  if err != nil {
    return nil, err
  }

  return slice.Map(refs, func(r aggregate.Ref) uuid.UUID { return r.ID })
}

Problem

In order to project all order read-models on startup, the manual trigger feature of the projection schedule is used. On startup, the following happens:

  1. Projector subscribes to the schedule (the schedule subscribes to the order events)
  2. Projector manually triggers the schedule
  3. Schedule creates a projection job. The job uses the event store of the schedule to do any event queries.
  4. Projector extracts order ids from the job's events (which are all orders)
  5. Projector iterates over each order id, and applies the events onto the read-model of the order.
  6. Job checks if the projection implements ProgressAware, and if so, filters events by their time before applying them.
  7. Job applies events on the orders without the progress check of the ProgressAware API

Step 6 and 7 are the weird part: The event filter effectively does the same as the disabled progress check. We could simply remove step 6 completely and enable the progress check in step 7.

Reason for disabling progress checks

When the ProgressAware API was added to goes, progress checks weren't disabled. That behavior could have potentially lead to invalid projection builds because of the following problem:

A projection may require events of many different aggregates. While goes guarantees that no two events of an aggregate's event stream have the same time, the same is not true for events of different aggregates. This means that a projection may apply multiple events that have the same time. Now, the ProgressAware API keeps track of the projection progress in terms of the time of the last applied event, which could potentially cause events to not be applied to a projection because another event with the same time has already been applied. For this reason, progress checks were disabled in projection jobs.

But this doesn't really make sense. The progress check just moved into the projection job as a filter, which does exactly the same as before.

Solution to the design flaw

Progress will be tracked not only in terms of time but also in terms of the ids of the applied events that have that exact time. An event is applied if its time is either after the time of the last applied events, or if its time is exactly the same as the last event time and the tracked last event ids do not contain the id of the event.

  • Step 6 can be removed
  • IgnoreProgress() option can be removed from step 7

Solution to the performance issue

Instead of fetching the entire event history from the event store, projection jobs modify the query so that the query respects the progress of a ProgressAware projection. This differs from the current implementation of step 6 in that it's not implemented as an in-memory filter (that is applied after the actual query) but as an actual change to the query itself.

In order to extract the order ids through the job from the store, but without actually fetching all order events, the following is done:

package order

func (proj *OrderProjector) Run(ctx context.Context) (<-chan error, error) {
  // The projection.Startup() option triggers a projection job on startup, using the provided query
  // to fetch the events from which the order ids are extracted. In this case, we fetch all Placed events.
  return proj.schedule.Subscribe(ctx, proj.applyJob, projection.Startup(query.New(
    query.Name(Placed),
  )))
}

func (proj *OrderProjector) applyJob(ctx projection.Job) error {
  orders, errs, err := proj.extractOrdersFromJob(ctx)
  if err != nil {
    return fmt.Errorf("extract orders from job: %w")
  }

  for _, orderID := range orders {
    if err := proj.orders.Use(ctx, orderID, func(o *OrderRM) {
      // The job queries the event store for all configured events.
      // The progress of the read-model is used to query only those events
      // that happened after the last applied event.
      return ctx.Apply(ctx, o)
    }); err != nil {
      log.Printf("failed to project order: %v [order=%s]", err, orderID)
    }
  }
}

func (proj *Projector) extractOrdersFromJob(ctx projection.Job) ([]uuid.UUID, error) {
  // Here, the query that was passed to projection.Startup() is used to extract the orders.
  str, errs, err := ctx.Aggregates(ctx)
  if err != nil {
    return nil, err
  }

  refs, err := streams.Drain(ctx, str, errs)
  if err != nil {
    return nil, err
  }

  return slice.Map(refs, func(r aggregate.Ref) uuid.UUID { return r.ID })
}

Explanation

A new projection.Startup() option can be provided when subscribing to a projection schedule. If provided, the schedule triggers a projection job on startup but modifies the job's Aggregates() helper so that it uses the query that was provided to projection.Startup() instead of the default query that fetches the entire history of the configured events. This makes the extraction of the orders way more performant (because only 1 event needs to be fetched for each order).

Currently, when applying a projection job onto one or multiple projections, the actual event query is only executed once. Subsequent calls to a job's Events(), EventsOf(), Aggregates() and Apply() methods re-use the result of the first query and only add in-memory filters to the result to return the correct events for a given projection.

With this change, instead of running a single (possibly costly) query once and then applying in-memory filters for each individual projection afterwards, a query is executed for each individual projection, but only within the EventsOf() and Apply() methods of a job; the Events() method still queries the entire event history from the store, and Aggregates() uses either the query that was provided to projection.Startup(), or it also fetches the entire history to do the extraction.

Either way, a job caches all queries, so the same query will never run twice.

Conclusion

The current ProgressAware implementation is flawed because it does not account for multiple events that possibly have the same time. Solution is to also track the ids of the last applied events that have the same time. Using this correct implementation, projection jobs can optimize queries for projections to avoid fetching the entire event history of a given set of events.

-> ProgressAware is useful.

Closed by #36