/servicestack-eventstore

ServiceStack Plugin for event-sourced systems using GetEventStore

Primary LanguageC#OtherNOASSERTION

ServiceStack.EventStore

Build status NuGet version

A plugin for ServiceStack that provides a message gateway to EventStore streams.

By adding this plugin to an application, such as a Windows Service, the application is able to:

  • Connect to a running instance (or cluster) of EventStore.
  • Subscribe to and handle events from named streams.
  • Persist an aggregate to, and rehydrate it from, an EventStore stream.
  • Populate a read model using events.

Requirements

An instance of the EventStore server should be running on the network. Please follow the installation instructions provided by EventStore.

You can verify that EventStore is running by browsing to port 2113 on the machine running the EventStore server.

Getting Started

Install the package from Nuget

Install-Package ServiceStack.EventStore

Setting up a Connection to EventStore

Add the following code to the Configure method in the AppHost class (this class is created automatically when you use one of the ServiceStack project templates). Additionally, you can take advantage of the ServiceStack MetadataFeature to provide a link to the EventStore admin UI by providing the HTTP address of the EventStore instance:

public override void Configure(Container container)
{
    //Register the EventStore plugin with ServiceStack, passing in 
    //the assembly that contains the CLR events (see below)
    Plugins.Add(new EventStoreFeature(typeof(ClrEvent).Assembly)); 
    //Optionally register the Metadata plugin
    Plugins.Add(new MetadataFeature());
}

Then, add the following settings to the App.config file of the root project:

<configuration>
    <startup> 
        <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" />
    </startup>
    <appSettings>
      <add key="ServiceStack.Plugins.EventStore.TcpEndPoint" value="localhost:1113"/>
      <add key="ServiceStack.Plugins.EventStore.HttpEndPoint" value="localhost:2113"/>
      <add key="ServiceStack.Plugins.EventStore.UserName" value="admin"/>
      <add key="ServiceStack.Plugins.EventStore.Password" value="changeit"/>
    </appSettings>
</configuration>

It is also possible to pass in a ready-made instance of IEventStoreConnection in the constructor of the EventStoreFeature. This has been made available to facilitate the integration test suite that now uses the EventStore Embedded Client and which passes in an instance of EmbeddedEventStoreConnection.

Please note that this sample assumes that:

  • EventStore is running on your local host. 1113 is the TCP port at which you can listen for events and 2113 is the HTTP port. These are the default ports that EventStore uses.

Subscribing to Named Streams

There are four different kinds of subscriptions to streams that ServiceStack.EventStore can create:

Subscription Type Description Expected Parameters
Volatile Provides access to an EventStore volatile subscription, which starts reading from the next event published on a named stream following successful connection by the plugin. The stream name.
Persistent Provides access to an EventStore persistent subscription, which supports the competing consumers messaging model on a named stream. The stream name and the subscription group.
Catch-Up Provides access to an EventStore catch-up subscription, which starts reading from either the beginning of a named stream or from a specified event number on that stream. The stream name.
Read Model Also provides access to an EventStore catch-up subscription, with the difference that it automatically subscribes to **all** streams ("$all" in EventStore) to enable a read model to be populated from selected events from different streams. None.

Subscriptions can be created as follows in the Configure method (we will cover read model subscriptions separately):

public override void Configure(Container container)
{
    var settings = new SubscriptionSettings()
	        	.SubscribeToStreams(streams =>
            	    	{
         	    	    //read a stream from the first event
                	    streams.Add(new CatchUpSubscription("stream_name"));
                            //read a stream from this moment forward
                            streams.Add(new VolatileSubscription("stream_name"));
                            //receive events from a stream as a competing consumer
                            streams.Add(new PersistentSubscription("stream_name", "subscription_group_name"));
                	});

 	...connection set-up omitted

	// Note the extra 'settings' parameter being used when creating an instance of the EventStoreFeature
	Plugins.Add(new EventStoreFeature(settings, typeof(ClrEvent).Assembly));
}

Modelling Events

The content of events in EventStore is stored in JSON format. In a language based on the .Net CLR we model each type of event that we want to work with as a class:

public class OrderCreated
{
    public Guid OrderId {get; set;}
    public DateTime Created {get; set;}
}

There is no need for such a class to implement a particular interface or inherit from a parent class. Rather, as you have seen, when registering the EventStore plugin we pass in a reference to the assembly (or assemblies) that contain the relevant classes:

public override void Configure(Container container)
{
    ...
    Plugins.Add(new EventStoreFeature(settings, typeof(SomeEvent).Assembly, typeof(AnotherEvent).Assembly);
}

Publishing Events

By adding the ServiceStack.EventStore package to your project we can access the EventStoreRepository through constructor injection and use it to publish asynchronously to a named stream:

public class FlightService: ServiceStack.Service
{
    public IEventStoreRepository EventStore { get; set; }
	
    public async Task DoSomething()
    {
       ...
	await EventStore.PublishAsync(new SomethingHappened(), "targetstream");
    }
}

For information about setting and reading the headers for an event, please refer to the Wiki

Handling Events

This plugin makes use of ServiceStack's architecture to route events from EventStore streams to their handlers which are implemented as methods on a service class:

Routing Events

To handle an event on a stream to which you have subscribed simply create a class that inherits from ServiceStack.Service and add an endpoint for each event you wish to handle:

public class PurchaseOrderService : Service
{
    public object Any(PurchaseOrderCreated @event)
    {
	//handle event
    }

    public object Any(OrderLineItemsAdded @event)
    {
	//handle event
    }
}

Setting a Retry Policy

When creating a subscription you can also specify the retry policy used by ServiceStack.EventStore in response to a subscription to EventStore being dropped. Since the retry functionality builds on the Polly library, the retry policy can be set by either specifying a parameter array TimeSpan or a delegate.

For example, in the Configure method we can specify a series of TimeSpans that tell the plugin that in the event of a specified subscription being dropped it should wait one second before retrying the subscription. And then three seconds after that. And then five seconds after that:

var settings = new SubscriptionSettings()
	            .SubscribeToStreams(streams =>
       	            {
                    	streams
			   .Add(new VolatileSubscription("deadletterchannel")
                           .SetRetryPolicy(1.Seconds(), 3.Seconds(), 5.Seconds()));
                    });

Alternatively, we can also tell the plugin to use an exponential back-off to multiplicatively increase the time to wait, for a specified maximum number of retry attempts, before attempting to resubscribe:

var settings = new SubscriptionSettings()
	            .SubscribeToStreams(streams =>
    	            {
                    	streams.Add(new VolatileSubscription("deadletterchannel")
                        			.SetRetryPolicy(10.Retries(), 
                                		retryCounter => TimeSpan.FromSeconds(Math.Pow(2, retryCounter))));
                    });

Aggregates

This plugin supports the event-sourced aggregates pattern whereby the state of an aggregate object is mutated by means of events that are raised in response to commands executed against that aggregate. Every event is raised in response to a command is held in memory until the aggregate is persisted to the event store. Following the event-sourcing mode, it is not the state of the aggregate that is persisted but, rather, the events which have led the aggregate to be in its current state.

When the aggregate is loaded (or 'rehydrated' in the parlance of event sourcing), again, it is not the state as such of the aggregate that is loaded but, rather, the events which were previously persisted to the event store. These events are re-applied to the aggregate (in exactly the same way they were when the original commands were executed) to reach the proper state of the aggregate. As Greg Young has reiterated "Current State is a left fold of previous facts".

Event Sourced Aggregate

In many implementations of the event-sourced aggregate pattern to be found on the internet (such as here, here, and here) the aggregate is modelled as a single class exposing (1) API methods that raise events in response to commands, (2) event handlers that mutate state in response to these events being raised, and (3) fields that hold that state. ServiceStack.EventStore, however, supports the modelling of a (logical) aggregate as two distinct classes with single responsibilities: a class that inherits from Aggregate<TState> and exposes command methods that are responsible for validation of the commands and raising events in response to them.

An event is raised by using the Causes<TEvent> method:

public class Flight : Aggregate<FlightState>
{
    public Flight(Guid id) : base(id)
    {
    }
	
    public Flight(): base(Guid.NewGuid())
    {
       Causes(new FlightCreated(Id));
    }
	
    public void UpdateFlightNumber(string newFlightNumber)
    {
	if (!string.IsNullOrEmpty(destination))
		Causes(new FlightNumberUpdated(newFlightNumber));
    }
	
    public void UpdateDestination(string destination)
    {
	if (!string.IsNullOrEmpty(destination))
		Causes(new DestinationChanged(destination));
    }
}

And a class that inherits from State that encapsulates the state of the aggregate and implements handlers for the events raised and mutates the state. An event SomethingHappened raised in the class that inherits Aggregate<TState> is handled by simply implementing a method On(SomethingHappened @event). The state of an aggregate should almost always be mutated by means of raising events and for that reason it is recommended that the fields of a state object be set as {get; private set;}:

public class FlightState : State
{
    public string FlightNumber { get; private set; }
    public string Destination { get; private set; }
	
    public void On(FlightCreated @event)
    {
    }
	
    public void On(FlightNumberUpdated @event)
    {
       FlightNumber = @event.NewFlightNumber;
    }
	
    public void On(DestinationChanged @event)
    {
       Destination = @event.Destination;
    }
}
public class FlightService
{
    private readonly IEventStoreRepository repo;
	
    public void FlightService(IEventStoreRepository repo)
    {
	this.repo = repo;
    }
	
    public async Task CancelFlight(Guid flightId)
    {
        var flight = await repo
			     .GetByIdAsync<Flight>(flightId)
			     .ConfigureAwait(false);
		
	flight.UpdateDestination("Dingwall International Airport");
		
	await repo.SaveAsync(flight);
    }
}

Configuring a Read Model Subscription

As mentioned previously, a read model subscription is similar to a catch-up subscription, with the difference being that a read model subscription subscribes to all streams in EventStore (to the $all projection) and, further, requires that a storage mechanism for the read model be specified.

A read model is essentially a projection of all events, or a subset thereof, that provides a stateful view of these events in a way that adds value to the end-users of the system.

Currently, the only storage model that is available is Redis:

var settings = new SubscriptionSettings()
                	.SubscribeToStreams(streams =>
                	{
                    	streams.Add(new ReadModelSubscription()
                                    .SetRetryPolicy(1.Seconds(), 3.Seconds())
                                    .WithStorage(new ReadModelStorage(StorageType.Redis, "localhost:6379")));
	                });

Please note that this code sample assumes that you have an instance of Redis installed on your local host which is using port 6379. Windows users can download the latest version of Redis from MSOpenTech or install it from Chocolatey.

Populating a Read Model

To populate a read model from subscribed event streams you need to do the following:

  • Create a ReadModelSubscription in the Configure method of the AppHost, as demonstrated above.
  • For each event type that you wish to consume from EventStore create a class. There is no need for such a class to implement an interface.
  • Create a class that inherits from ServiceStack.Service and add methods that take in the desired CLR event types.
  • Create a view model class to represent a record in the read model. Potentially, this could be a hierarchical object graph that could be persisted as a JSON document in Redis (or RavenDB ) or as a set of rows in RDBMS tables.
  • In the Service class instantiate a ProjectionWriter, specifying the type of the unique Id and the view model to be used.

Creating a Read Model

When handling an event that corresponds to a new record being required in the read model - for example, PurchaseOrderCreated - then use the Add method to create a new instance of desired view model. When handling events that should update the state of a record in the read model then use the Update method to pass in the Id of the record to be updated as well as a delegate that mutates the appropriate properties of the view model:

public class PurchaseOrderService : Service
{
    private IReadModelWriter<Guid, OrderViewModel> writer = 
                        ReadModelWriterFactory.GetRedisWriter<Guid, OrderViewModel>();

    public object Any(PurchaseOrderCreated @event)
    {
        return writer.Add(new OrderViewModel(@event.Id));
    }

    public object Any(OrderLineItemsAdded @event)
    {
        return writer.Update(@event.OrderId, 
                vm => vm.LineItemCount += @event.OrderLineItems.Count);
    }

    public object Any(OrderStatusUpdated @event)
    {
        return writer.Update(@event.OrderId,
            vm => vm.OrderStatus = @event.NewStatus);
    }
}

Attributions

This project leans gratefully on the following OS projects: