
Copy & transform migration strategy for Elixir EventStore

Primary LanguageElixir

EventStore migrator

Copy & transform migration strategy for eventstore.

Copy and transformation transforms every event to a new store. In this technique the old event store stays intact, and a new store is created instead.


EventStore.Migrator copies an event store PostgreSQL database from a source to a target.

It allows you to modify the events during the copy. You can transform, remove, aggregate, and alter the serialization format of the events.

The migrator exposes a single migrate function. It expects an anonymous function that receives an event stream. You can use any of the functions from Elixir's stream module to mutate the event stream. The modified events are appended to the target event store database.

Remove an unwanted event

EventStore.Migrator.migrate(fn stream ->
    fn (event_data) -> event_data.event_type == "UnwantedEvent" end

Upgrade an event

defmodule OriginalEvent, do: defstruct [uuid: nil]
defmodule UpgradedEvent, do: defstruct [uuid: nil, additional: nil]

EventStore.Migrator.migrate(fn stream ->
    fn (event) ->
      case event.data do
        %OriginalEvent{uuid: uuid} ->
          %EventStore.RecordedEvent{event |
            event_type: "UpgradedEvent",
            data: %UpgradedEvent{uuid: uuid, additional: "upgraded #{uuid}"},
        _ -> event

Aggregate multiple events into one event

defmodule SingleEvent, do: defstruct [uuid: nil, group: nil]
defmodule AggregatedEvent, do: defstruct [uuids: [], group: nil]

# aggregate multiple single events for the same group into one aggregated event
defp aggregate([%{data: %SingleEvent{}}] = events), do: events
defp aggregate([%{data: %SingleEvent{group: group}} = source | _] = events) do
    %EventStore.RecordedEvent{source |
      data: %AggregatedEvent{
        uuids: Enum.map(events, fn event -> event.data.uuid end),
        group: group,
      event_type: "AggregatedEvent",
defp aggregate(events), do: events

EventStore.Migrator.migrate(fn stream ->
  |> Stream.chunk_by(fn event -> {event.stream_id, event.event_type} end)
  |> Stream.map(fn events -> aggregate(events) end)
  |> Stream.flat_map(fn events -> events end)

Migrate serialization format

Configure the source and target serializers in the environment configuration file (e.g. config/dev.exs).

config :eventstore, EventStore.Storage,
  serializer: JsonSerializer,
  # ...

config :eventstore_migrator, EventStore.Migrator,
  serializer: AlternateSerializer,
  # ...

Run the migration without changing the event stream.

EventStore.Migrator.migrate(source_config, target_config, fn stream -> stream end)


Streams are composable so you can combine multiple transforms in a single migration.