Copy & transform migration strategy for eventstore.
Copy and transformation transforms every event to a new store. In this technique the old event store stays intact, and a new store is created instead.
EventStore.Migrator
copies an event store PostgreSQL database from a source to a target.
It allows you to modify the events during the copy. You can transform, remove, aggregate, and alter the serialization format of the events.
The migrator exposes a single migrate
function. It expects an anonymous function that receives an event stream. You can use any of the functions from Elixir's stream module to mutate the event stream. The modified events are appended to the target event store database.
EventStore.Migrator.migrate(fn stream ->
Stream.reject(
stream,
fn (event_data) -> event_data.event_type == "UnwantedEvent" end
)
end)
defmodule OriginalEvent, do: defstruct [uuid: nil]
defmodule UpgradedEvent, do: defstruct [uuid: nil, additional: nil]
EventStore.Migrator.migrate(fn stream ->
Stream.map(
stream,
fn (event) ->
case event.data do
%OriginalEvent{uuid: uuid} ->
%EventStore.RecordedEvent{event |
event_type: "UpgradedEvent",
data: %UpgradedEvent{uuid: uuid, additional: "upgraded #{uuid}"},
}
_ -> event
end
end
)
end)
defmodule SingleEvent, do: defstruct [uuid: nil, group: nil]
defmodule AggregatedEvent, do: defstruct [uuids: [], group: nil]
# aggregate multiple single events for the same group into one aggregated event
defp aggregate([%{data: %SingleEvent{}}] = events), do: events
defp aggregate([%{data: %SingleEvent{group: group}} = source | _] = events) do
[
%EventStore.RecordedEvent{source |
data: %AggregatedEvent{
uuids: Enum.map(events, fn event -> event.data.uuid end),
group: group,
},
event_type: "AggregatedEvent",
},
]
end
defp aggregate(events), do: events
EventStore.Migrator.migrate(fn stream ->
stream
|> Stream.chunk_by(fn event -> {event.stream_id, event.event_type} end)
|> Stream.map(fn events -> aggregate(events) end)
|> Stream.flat_map(fn events -> events end)
end)
Configure the source and target serializers in the environment configuration file (e.g. config/dev.exs
).
config :eventstore, EventStore.Storage,
serializer: JsonSerializer,
# ...
config :eventstore_migrator, EventStore.Migrator,
serializer: AlternateSerializer,
# ...
Run the migration without changing the event stream.
EventStore.Migrator.migrate(source_config, target_config, fn stream -> stream end)
Streams are composable so you can combine multiple transforms in a single migration.