modernice/goes

Allowing Jetstream driver to use mirrors/sources

Closed this issue · 2 comments

Not sure if this is part of the scope of the project, however we have a scenario whereby we would like to allow developers to mirror or sources the goes stream from another account and allow them to freely break their environment while still being able to source "real" data. With the current implementation of ensureStream uses the subject to ensure that it is configured correctly. Wondering if stream verification could also take mirrors and sources into consideration?

If there is another approach to handling this use case please let me know.

I just skimmed over the NATS documentation to see how Mirrors and Sources work and I think this could be tested/implemented by adding a new JetStreamOption that intercepts the *nats.StreamConfig when created to allow for custom options. Something like the following:

package nats

func PatchStreamConfig(patch func(*nats.StreamConfig)) JetStreamOption {
  return func(js *jetStream) {
    js.streamConfigPach = patch
  }
}

Then patch the config in another option:

package nats

func StreamSources(sources ...*nats.StreamSource) JetStreamOption {
  return PatchStreamConfig(func(cfg *nats.StreamConfig) {
    cfg.Sources = append(cfg.Sources, sources...)
  }))
}

Not sure what additional configuration would be required to make it work but the stream validation would need to take the Sources (and Mirrors) options into consideration.


For your use case, I think the simplest solution would be to implement a universal "multi-source" event bus that delegates queries to the underlying event buses.

// event/eventbus/merged.go

func NewMerged(rw event.Bus, r ...event.Bus) event.Bus {
  // ...
}
package example

func example(natsBus *nats.EventBus, memBus event.Bus) {
  // Create a new event bus using the in-memory bus for "read" and "write"
  // and the NATS bus only for "read"
  bus := eventbus.NewMerged(memBus, natsBus)

  // Publish over the in-memory event bus
  err := bus.Publish(context.TODO(), event.New(...))

  // Subscribe to "foo" events of in-memory _and_ NATS bus
  // Under the hood, the two event streams can be merged
  // using [github.com/modernice/goes/helper/streams.FanIn]
  events, errs, err := bus.Subscribe(context.TODO(), "foo")
}

This implementation would publish only over the "development" event bus but could also subscribe to events from the "production" event bus.

Do you think this solution could this be applied to your problem? Please let me know if I'm missing something here.

The solution I came up with was to create a mirror and then create a stream that sourced the mirror and added the expected subjects