Asynchronous event sourcing.
Library provides a logical layer for storing and querying events as a stream.
Heavily inspired by Greg Young's Event Store and Streamstone
solutions.
Designed to be easily extended with custom database backends.
Despite the fact that component implements a logical layer for storing and querying events as a stream,
it does not provide functionality of DDD aggregate
, such as state mutation, conflict resolution etc., but serves more as persistence layer
for it.
Package | Description | Multitenancy | Package |
---|---|---|---|
StreamStore.NoSql.Cassandra | Apache Cassandra and Azure Cosmos DB for Apache Cassandra port |
✅ | |
StreamStore.Sql.PostgreSql | PostgreSQL port |
✅ | |
StreamStore.Sql.Sqlite | SQLite port |
✅ | |
StreamStore.InMemory | In-memory port is provided for testing and educational purposes only |
✅ | |
StreamStore.S3.AWS | Amazon S3 port |
❌ | |
StreamStore.S3.B2 | Backblaze B2 port |
❌ |
About basic concepts you can read in CONCEPTS.md.
The general idea is to highlight the common characteristics and features of event sourcing storage:
- Asynchronous read and write operations.
- Multitenancy support.
- Automatic provisioning of storage schema.
- Event ordering.
- Serialization/deserialization of events.
- Optimistic concurrency control.
- Event duplication detection based on event ID.
- Database agnostic test framework, including benchmarking test scenarios.
- Binary serialization support.
Also add implementations of particular storage, such as:
-
In-Memory
- for testing purposes. -
Binary Object
storages:-
Backblaze B2
- Backblaze B2. -
Amazon S3
- Amazon S3.
-
-
SQL
based DBMS: -
NoSQL
based DBMS:
- Composite stream identifier
- Custom event properties (?).
- External transaction support (?).
- Transactional outbox pattern implementation (?).
To install the package, you can use the following command from the command line:
# Install StreamStore package
dotnet add package StreamStore
# Install package of particular database implementation, for instance InMemory
dotnet add package StreamStore.InMemory
or from NuGet Package Manager Console:
# Install StreamStore package
Install-Package StreamStore
# Install package of particular database implementation, for instance SQLite database backend
Install-Package StreamStore.Sql.Sqlite
- Register store in DI container
services.ConfigureStreamStore(x => // Register StreamStore
x.EnableSchemaProvisioning() // Optional. Enable schema provisioning, default: false.
// Register single database implementation, see details in documentation for particular database
x.WithSingleDatabase(c =>
c.UseSqliteDatabase(x => // For instance, SQLite database backend
x.ConfigureDatabase(c =>
c.WithConnectionString(connectionString)
)
)
)
// Or enable multitenancy, see details in documentation for particular database.
x.WithMultitenancy(c =>
c.UseInMemoryDatabase() // For instance, InMemory database backend
.UseTenantProvider<MyTenantProvider>() // Optional. Register your ITenantProvider implementation.
// Required if you want schema to be provisioned for each tenant.
)
);
- Use store in your application
// Inject IStreamStore in your service or controller for single database implementation
public class MyService
{
private readonly IStreamStore store;
public MyService(IStreamStore store)
{
this.store = store;
}
}
// Or IStreamStoreFactory for multitenancy
public class MyService
{
private readonly IStreamStoreFactory storeFactory;
public MyService(IStreamStoreFactory storeFactory)
{
this.storeFactory = storeFactory;
}
}
// Append events to stream or create a new stream if it does not exist
// EventObject property is where you store your event
var events = new Event[] {
new Event { Id = "event-1", Timestamp = DateTime.Now, EventObject = eventObject }
...
};
try {
store
.BeginWriteAsync("stream-1") // Open stream like new since revision is not provided
.AppendEventAsync(x => // Append events one by one using fluent API
x.WithId("event-3")
.Dated(DateTime.Now)
.WithEvent(eventObject)
)
...
.AppendRangeAsync(events) // Or append range of events by passing IEnumerable<Event>
.CommitAsync(token);
} catch (StreamConcurrencyException ex) {
// Read from stream and implement your logic for handling optimistic concurrency exception
await foreach(var @event in await store.BeginReadAsync("stream-1", token)) {
...
}
// Push result to the end of stream
store
.BeginWriteAsync("stream-1", ex.ActualRevision)
.AppendEventAsync(x => // Append events one by one using fluent API
x.WithId( "event-4")
.Dated(DateTime.Now)
.WithEvent(yourEventObject)
)
...
.CommitAsync(streamId);
} catch (StreamLockedException ex) {
// Some database backends like S3 do not support optimistic concurrency control
// So, the only way to handle concurrency is to lock the stream
}
More examples of reading and writing events you can find in test scenarios of StreamStore.Testing project.
Each type of storage has its own example project, for instance, you can find an example of usage in the StreamStore.Sql.Example project.
Example projects provides a simple console application that demonstrates how to configure and use StreamStore
in your application as single database or multitenancy.
Single database
examples demonstrates:
- optimistic concurrency control
- asynchronous reading and writing operations
Multitenancy
examples, in turn, demonstrates asynchronous reading and writing operations in isolated tenant storage.
For getting all running options simply run the application with --help
argument.
For configuring application via configuration file, create appsettings.Development.json
file.
How to create your own storage implementation you can find in CUSTOMIZATION.md.
If you experience any issues, have a question or a suggestion, or if you wish to contribute, feel free to open an issue or start a discussion.