Generic Eventsourcing / CQRS implementation in Go. For the sack of the example, we just go through a CRUD use case. Feel free to implement the read model with the appropriate database architecture that matches your usecase.
-
Define a new struct that embed
eventsourcing.AggregateBase
const GroupAggregateType eventsourcing.AggregateType = "group" // Group is the aggregate root type Group struct { *eventsourcing.AggregateBase[Group] // it has a name property that is private to make sure it is only updated // through domain events name string } // NewGroup is creates a new Group aggregate func NewGroup() *Group { return &Group{ AggregateBase: eventsourcing.NewAggregateBase[Group](uuid.Nil, 0), } } // AggregateType is required to implement the Aggregate interface func (g Group) AggregateType() eventsourcing.AggregateType { return GroupAggregateType } // Name makes the name property accessible func (g Group) Name() string { return g.name }
-
Define and register events
const EvtTypeGroupCreated eventsourcing.EventType = "group.created" // RegisterEvents registers events so they can be hydrated from the event store func RegisterEvents(registry eventsourcing.EventRegistry[Group]) { registry.Register(EvtTypeGroupCreated, func() eventsourcing.Event[Group] { return &EvtGroupCreated{ EventBase: &eventsourcing.EventBase[Group]{}, } }) registry.Register(EvtTypeGroupNameSet, func() eventsourcing.Event[Group] { return &EvtGroupNameSet{ EventBase: &eventsourcing.EventBase[Group]{}, } }) } // EvtGroupCreated is emitted when a group is created type EvtGroupCreated struct { *eventsourcing.EventBase[Group] } // NewEvtGroupCreated is a helper to create a new EvtGroupCreated event // with the appropriate event base func NewEvtGroupCreated(aggregateId uuid.UUID, aggregateVersion int, createdBy eventsourcing.User) *EvtGroupCreated { return &EvtGroupCreated{ EventBase: eventsourcing.NewEventBase[Group]( GroupAggregateType, aggregateVersion, EvtTypeGroupCreated, aggregateId, createdBy, ), } } // Apply performs the changes on the aggregate func (e EvtGroupCreated) Apply(g *Group) error { // helper to define the aggregate base properties (created_at, ..) // on creation event we call init instead of process g.Init(e) return nil } // EvtGroupNameSet is emitted when name is set type EvtGroupNameSet struct { *eventsourcing.EventBase[Group] // Name is the new name of the group //! Make sure to have all the properties of the event exported //! or they will not be marshalled / unmarshalled properly Name string } func NewEvtGroupNameSet(aggregateId uuid.UUID, aggregateVersion int, updatedBy eventsourcing.User, name string) *EvtGroupNameSet { return &EvtGroupNameSet{ EventBase: eventsourcing.NewEventBase[Group]( GroupAggregateType, aggregateVersion, EvtTypeGroupNameSet, aggregateId, updatedBy, ), Name: name, } } func (e EvtGroupNameSet) Apply(g *Group) error { // helper to define the aggregate base properties (modified_at, ..) g.Process(e) // update the aggregate name g.name = e.name return nil }
-
Create use case
// cmdCreate is the command to create a new group type cmdCreate struct { // to make cmdCreate a command eventsourcing.CommandBase[domain.Group] // Name the property of the command Name string `validate:"required"` } // CreateHandler is the usecase handler type CreateHandler struct { validator *validator.Validate commandHandler eventsourcing.CommandHandler[domain.Group] } func NewCreateHandler(commandHandler eventsourcing.CommandHandler[domain.Group]) CreateHandler { validator := validator.New(validator.WithRequiredStructEnabled()) return CreateHandler{ commandHandler: commandHandler, validator: validator, } } func (h CreateHandler) Create(ctx context.Context, issuer eventsourcing.User, name string) (*domain.Group, error) { // let's validate the command err := h.validator.Struct(cmd) if err != nil { log.Error().Err(err).Msg("create: invalid command") return nil, err } // aggregate is not loaded from the eventstore at this stage // it is cheap to perform any business logic check at this stage // like checking if it is possible to create a group for this issuer // we should return an error if the command should not be accepted return h.commandHandler.HandleCommand( ctx, cmdCreate{ CommandBase: eventsourcing.NewCommandBase[domain.Group]( uuid.New(), // new aggregate id domain.AggregateGroup, // aggregate type issuer, // tracking who did create that group ), Name: name, }, ) } // Apply returns a list of events that should be emitted when the command is applied func (c cmdCreate) Apply(aggregate *domain.Group) ([]eventsourcing.Event[domain.Group], error) { // aggregate is at this stage loaded from write model (eventstore) // more advanced business logic involving the aggregate itself // should be done here return []eventsourcing.Event[domain.Group]{ // the new group is created domain.NewEvtGroupCreated(c.AggregateId(), 0, c.IssuedBy()), // the name is set domain.NewEvtGroupNameSet(c.AggregateId(), 1, c.IssuedBy(), c.Name), }, nil }
At this point we have an event sourcing system for creating groups
The following is a generic read model implementation that suits development and tests purposes. The generic read model can also be used in combination with proper database implementation for scaffolding purposes but should ideally be replaced by a custom implementation to decrease the number of round trips and queries to the database.
- Create a new read model (in memory for the example, relying on the generic one from the library)
type GroupQuery struct { id *uuid.UUID } func NewGroupQuery(id *uuid.UUID) *GroupQuery { return &GroupQuery{ id: id, } } func (q GroupQuery) Id() *uuid.UUID { return q.id } type inMemoryReadModel struct { rm *readmodel.InMemoryReadModel[domain.Group] } func NewInMemoryReadModel(eventStream eventsourcing.Subscriber[domain.Group]) *inMemoryReadModel { return &inMemoryReadModel{ rm: readmodel.NewInMemoryReadModel[domain.Group]( eventStream, domain.NewGroup, domain.EvtTypeGroupCreated, eventsourcing.EvtTypeNil, // Replace by the appropriate event type when the event is implemented ), } } // HandleEvent definition is optional as Find and Get will handle the job // however it might be useful to have it exposed for testing purpose (manually injecting events) func (l *inMemoryReadModel) HandleEvent(e eventsourcing.Event[domain.Group]) { l.rm.HandleEvent(e) } func (l *inMemoryReadModel) Find(ctx context.Context, query GroupQuery) ([]*domain.Group, error) { return l.rm.Find(ctx, aggregateMatcher(query)) } func (l *inMemoryReadModel) Get(ctx context.Context, query GroupQuery) (*domain.Group, error) { return l.rm.Get(ctx, aggregateMatcher(query)) } // aggregateMatcher is a helper function to build the aggregate matcher based on the query // in a database scenario it would be used to build the where clause func aggregateMatcher(query GroupQuery) readmodel.AggregateMatcher[domain.Group] { var matcher readmodel.AggregateMatcher[domain.Group] if query != nil { matcher = readmodel.AggregateMatcherAnd[domain.Group]( readmodel.AggregateMatcherAggregateId[domain.Group](query.Id()), ) } return matcher }
- Create the list use case
type GroupReadModel interface { Find(ctx context.Context, query GroupQuery) ([]*domain.Group, error) Get(ctx context.Context, query GroupQuery) (*domain.Group, error) } type ListHandler struct { groupReadModel GroupReadModel } func NewListHandler(groupReadModel GroupReadModel) ListHandler { return ListHandler{ groupReadModel: groupReadModel, } } func (h ListHandler) List(ctx context.Context, issuedBy eventsourcing.User, query GroupQuery) ([]*domain.Group, error) { // TODO: check if possible to list groups for this issuer return h.groupReadModel.Find(ctx, query) }