Akka Persistence EventStore Plugin is a plugin for Akka Persistence
that provides components:
- write journal store
- snapshot store
- standard persistence queries
This plugin stores data in a EventStore database and based on EventStore.Client client library for .net45 and EventStore.ClientAPI.NetCore client library for .netstandard2.0.
From Nuget Package Manager
Install-Package Akka.Persistence.EventStore
From .NET CLI
dotnet add package Akka.Persistence.EventStore
To activate the journal plugin, add the following line to your HOCON config:
akka.persistence.journal.plugin = "akka.persistence.journal.eventstore"
This will run the journal with its default settings. The default settings can be changed with the configuration properties defined in your HOCON config:
connection-string
- connection string, as described here: https://eventstore.org/docs/dotnet-api/connecting-to-a-server/index.html#connection-stringconnection-name
- connection name to tell eventstore server who is connectingread-batch-size
- when reading back events, how many to bring back at a timeadapter
- controls how the event data and metadata is stored and retrieved. See Adapter section below for more information.
akka.persistence {
journal {
plugin = "akka.persistence.journal.eventstore""
eventstore {
connection-string = "ConnectTo=tcp://admin:changeit@localhost:1113; HeartBeatTimeout=500"
connection-name = "Akka"
}
}
}
Akka Persistence EventStore Plugin supports changing how data is stored and retrieved.
By default, it will serialize the data using Newtonsoft.Json
, and populate the Metadata with the following information:
{
"persistenceId": "p-14",
"occurredOn": "2018-05-03T10:28:06.3437687-06:00",
"manifest": "",
"senderPath": "",
"sequenceNr": 5,
"writerGuid": "f8706bba-52a7-4326-a760-990c7f657c46",
"clrEventType": "System.String, System.Private.CoreLib"
}
If you are happy with the default serialization and metadata, but want to just augment the metadata or data, or do any of the following:
- Inspect event to add metadata
- Encrypt data
- Change the "type" stored in event store
You can inherit from DefaultAdapter and override the ToBytes
and ToEvent
methods
public class AltAdapter : DefaultAdapter
{
protected override byte[] ToBytes(object @event, JObject metadata, out string type, out bool isJson)
{
var bytes = base.ToBytes(@event, metadata, out type, out isJson);
//Add some additional metadata:
metadata["additionalProp"] = true;
//Do something additional with bytes.
return bytes;
}
protected override object ToEvent(byte[] bytes, JObject metadata)
{
//Use the metadata to determine if you need to do something additional to the data
//Do something additional with bytes before handing it off to be deserialized.
return base.ToEvent(bytes, metadata);
}
}
You also have the option of creating a new implemenation of Akka.Persistence.EventStore.IAdapter
public class CustomAdapter : IAdapter
{
public DefaultAdapter()
{
}
public EventData Adapt(IPersistentRepresentation persistentMessage)
{
//Implement
}
public IPersistentRepresentation Adapt(ResolvedEvent resolvedEvent, Func<string, IActorRef> actorSelection = null)
{
//Implement
}
}
Whichever direction you go, you will need to override the HOCON to use your new adapter
akka.persistence {
journal {
plugin = "akka.persistence.journal.eventstore""
eventstore {
class = "Akka.Persistence.EventStore.Journal.EventStoreJournal, Akka.Persistence.EventStore"
connection-string = "ConnectTo=tcp://admin:changeit@localhost:1113; HeartBeatTimeout=500"
connection-name = "Akka"
read-batch-size = 500
adapter = "Your.Namespace.YourAdapter, Your.Assembly"
}
}
}
From Nuget Package Manager
Install-Package Akka.Persistence.EventStore.Query
From .NET CLI
dotnet add package Akka.Persistence.EventStore.Query
Please note that you need to cofigure write jouranl anyways since EventStore
Persistance Query reuses connection from that journal. Also, it uses
IEventStoreConnection.SubscribeToStreamFrom
known as Catch up subscription,
this means that each subscription will start new connection under the hood.
This is the way how official EventStore client library works.
To activate the journal plugin, add the following line to your HOCON config:
akka.persistence.query.journal.plugin = "akka.persistence.query.journal.eventstore"
This will run the journal with its default settings. The default settings can be changed with the configuration properties defined in your HOCON config:
write-plugin
- Absolute path to the write journal plugin configuration entry that this query journal will connect to. If undefined (or "") it will connect to the default journal as specified by theakka.persistence.journal.plugin
property.max-buffer-size
- How many events to fetch in one query (replay) and keep buffered until they are delivered downstreams. Default value is 500.auto-ack
- Should query journal automaticaly aknowladge delivered events to downstream. Default isfalse
. This is reserved for next release for competing consumers query. See this docs for more details
akka.persistence.query.journal.eventstore {
write-plugin = ""
max-buffer-size = 500
auto-ack = false
}
To use standard queries please refer to documentation about Persistence Query on getakka.net website.
- CompetingBy(string groupName, string streamName)