modernice/goes

Google PubSub backend implementation for the event.Bus

Closed this issue · 3 comments

Summary

Provide an Event Bus backend implementation backed by Google PubSub event broker.

Motivation

We are deploying our application GCE and we need an event bus to publish events so the application can use an event driven architecture. One of the uses is as for Event Sourcing and to allow the projectors to see events from aggregates.

We evaluated Google PubSub and saw that it supports Event Ordering. An event can provide an Ordering Key and the infrastructure will warrant the events for the same Ordering Key will be provided in order. Our intent is to use the Aggregate ID as the ordering key.

Proposal

The idea is to implement the event.Bus interface using the cloud.google.com/go/pubsub package. This implementation will take the nats.bus implementation as a base.

Using topicFunc (like subjectFunc in nats implementation), all events of a given aggregate type can be published on the same PubSub topic.

Also the Aggregate ID in the event can be used as an Ordering Key.

Note

I'm using this implementation to learn about goes and its capabilities. Any suggestion or guidence us wellcome.

Sorry for the late response, I'm currently a bit too busy to take a deeper look into this but I took a look at the Google Pub/Sub event bus implementation of another event-sourcing library here, and they are implementing it as you described (using the Ordering Key).

One thing to consider is the "wildcard" feature that the NATS and In-Memory event buses support. I found this link that claims it should be possible but couldn't find a deeper guide for now.

Wildcard example

package example

func example(bus event.Bus) {
  // Subscribe to all events.
  events, errs, err := bus.Subscribe(context.TODO(), "*")
}

To test the event bus implementation, you can use the eventbustest package and simply call the test suites:

// backend/gcp/eventbus_test.go
package gcp_test

import (
	"testing"

	"github.com/modernice/goes/backend/testing/eventbustest"
	"github.com/modernice/goes/codec"
	"github.com/modernice/goes/event"
	"github.com/modernice/goes/event/eventbus"
)

func TestEvenBus(t *testing.T) {
	eventbustest.RunCore(t, newBus)
	eventbustest.RunWildcard(t, newBus)
}

func newBus(codec.Encoding) event.Bus {
	// Return Pub/Sub event bus.
}

Hope this helps, and if you have any questions, I'm happy to help 🙂

Hi @bounoable ,

I have a doubt regarding the Bus interface. The consumer will receive a channel that will provide the events to consume.

This schema does offer no warranty regarding the events are processed. If the bus deliver an event to a handler, and the handler is stopped in any way (the node goes down, a processing error, etc.) the event will be lost.

Are you using any mechanism to avoid loosing events?

Hi @bounoable ,

I have a doubt regarding the Bus interface. The consumer will receive a channel that will provide the events to consume.

This schema does offer no warranty regarding the events are processed. If the bus deliver an event to a handler, and the handler is stopped in any way (the node goes down, a processing error, etc.) the event will be lost.

Are you using any mechanism to avoid loosing events?

Hey @totemcaf,

yes you're right that you definitely cannot rely on the event bus to "confirm" processing of events. If your node goes down, or if your consumer is too slow you can "lose" events.

There are solutions to this issue, but it's hard giving a single answer to cover all possible scenarios. Most important is that you persist events in an event store if you need to provide this kind of guarantee.

In most cases, the easiest solution will be to create a projection that implements the ProgressAware API, and use the projection.Startup() option when subscribing to the projection schedule. This may be a bit hard to get your head around, so I'll give you an example.

Example

Given an application that publishes a "user.registered" event whenever a user creates an account, we want to send a welcome email to the user. The following code will ensure that each new user receives a welcome mail, even if the node goes down before sending the mail. This example also defines Model Repository for the mailer to persist its state (projection progress).

package example

import (
  "context"

  "github.com/google/uuid"
  "github.com/modernice/goes/projection"
  "github.com/modernice/goes/projection/schedule"
  "github.com/modernice/goes/persistence/model"
)

type WelcomeMailer struct {
  *projection.Progressor

  id uuid.UUID
}

func NewMailer(id uuid.UUID) *WelcomeMailer {
  return &WelcomeMailer{
    Progressor: projection.NewProgressor(),
    id: id,
  }
}

// ModelID implements model.Model (for the WelcomeMailerRepository).
func (wm *WelcomeMailer) ModelID() uuid.UUID {
  return wm.id
}

func (wm *WelcomeMailer) ApplyEvent(evt event.Event) {
  // Send welcome mail to user ...
}

type WelcomeMailerRepository = model.Repository[*WelcomeMailer, uuid.UUID]

// Defining a fixed UUID for the mailer because there will only ever be a single mailer.
var WelcomeMailerID = uuid.MustParse("949189d7-6de5-4c27-b8ea-91c3e3b8e33f")

func RunMailer(
  ctx context.Context,
  bus event.Bus,
  store event.Store,
  repo WelcomeMailerRepository,
) (<-chan error, error) {
  // Define a schedule that is triggered by "user.registered" events.
  s := schedule.Continuously(bus, store, []string{"user.registered"})

  return s.Subscribe(
    ctx,
    func(ctx projection.Job) error {
      // Fetch the mailer state from the repository.
      wm, err := repo.Fetch(ctx, WelcomeMailerID)
      if err != nil {
        return fmt.Errorf("welcome mailer not found: %w", err)
      }

      // Apply the events from the job to the WelcomeMailer.
      // In this example, it will only ever apply "user.registered" events.
      if err := ctx.Apply(ctx, wm); err != nil {
        return err
      }

      // Save the updated mailer back to the repository.
      return repo.Save(ctx, wm)
    },

    // The Startup() option fetches the "user.registered" events from the event store
    // and immediately creates a projection job for these events. Because WelcomeMailer
    // embeds *Progressor, only the events that haven't already been applied to the WelcomeMailer
    // will be fetched and applied.
    projection.Startup(),
  )
}
package main

import (
  "example"

  "github.com/google/uuid"
  "github.com/modernice/goes/backend/memory"
)

func main() {
  // Using in-memory repository, event bus, and event store for brevity.
  // In your application you should use production-ready backends (MongoDB, NATS etc.)
  repo := memory.NewModelRepository[*example.WelcomeMailer, uuid.UUID](memory.ModelFactory(example.NewMailer))
  bus := eventbus.New()
  store := eventstore.New()

  mailerErrors, err := example.RunMailer(context.TODO(), bus, store, repo)
  if err != nil {
    log.Panic(err)
  }

  for err := range mailerErrors {
    log.Println(err)
  }
}

This example might not be the best because "welcome mails" are arguably something that doesn't need this kind of guarantee in most applications 😄 But if you really need to give this guarantee then it's certainly possible to do this using the projection library.