Event Sourcing with PostgreSQL
- Introduction
- Example domain
- Event sourcing and CQRS basics
- Solution architecture
- How to run the sample?
Introduction
Usually, our applications operate with the current state of a domain object. But sometimes, we need to know the entire history of the domain object changes. For example, we want to know how an order got into its current state.
The audit trail (also called the audit log) is a chronological record of the history and details of the actions that affected the system. An audit trail may be a regulatory or business requirement.
We can store all changes to the domain object state as a sequence of events in an append-only event stream. Thus, event streams will contain an entire history of changes. But how can we be sure that this history is authentic and error-free? We can use event streams as a primary source of truth in a system. To get the current state of an object, we have to replay all events in the order of occurrence. This pattern is called event sourcing. The database for storing event streams is called an event store. Event sourcing provides a complete and accurate record of all changes made to a system. Event sourcing is an industry standard for implementing audit trail.
There are specialized databases for event sourcing. Developer Advocates working for the companies behind these specialized databases said you shouldn't implement event sourcing with traditional relational or document-oriented databases. Is this true or just a marketing ploy?
Specialized databases for event sourcing are convenient and provide the necessary functionality out of the box. But PostgreSQL, the world's most advanced open-source database, is also suitable for event sourcing. You can use PostgreSQL as an event store without additional frameworks or extensions instead of setting up and maintaining a separate specialized database for event sourcing.
This repository provides a reference implementation of an event-sourced system that uses PostgreSQL as an event store. You can also fork the repo and use it as a template for your projects.
See also
Example domain
This sample uses a simplified domain model of the ride-hailing system.
- A rider can place an order for a ride along a route specifying a price.
- A rider can edit an order price to pay more instead of waiting in cases of very high demand.
- A driver can accept an order.
- A driver can complete previously accepted order.
- An order can be canceled before completion.
Event sourcing and CQRS basics
State-oriented persistence
State-oriented persistence (CRUD) applications store only the latest version of an entity.
Database records present entities.
When an entity is updated, the corresponding database record gets updated too.
SQL INSERT
, UPDATE
and DELETE
statements are used.
Event sourcing
Event sourcing applications persist the state of an entity as a sequence of immutable state-changing events.
Whenever the state of an entity changes, a new event is appended to the list of events.
Only SQL INSERT
statements are used.
Events are immutables, so SQL UPDATE
and DELETE
statements are not used.
The current state of an entity can be restored by replaying all its events.
Event sourcing is closely related to domain-driven design (DDD) and shares some terminology.
An entity in event sourcing is called an aggregate.
A sequence of events for the same aggregate is called a stream.
Event sourcing is best suited for short-living entities with a small total number of events (e.g., orders).
Restoring the state of the short-living entity by replaying all its events doesn't have any performance impact. Thus, no optimizations for restoring state are required for short-living entities.
For endlessly stored entities (e.g., users, bank accounts) with thousands of events restoring state by replaying all events is not optimal, and snapshotting should be considered.
Snapshotting
Snapshotting is an optimization technique where a snapshot of the aggregate's state is also saved, so an application can restore the current state of the aggregate from the snapshot rather than from all the events (potentially thousands).
On every nth event, make an aggregate snapshot by storing an aggregate state and its version.
To restore an aggregate state:
- first read the latest snapshot,
- then read events forward from the original stream starting from the version pointed by the snapshot.
Querying the data
It's easy to find an aggregate by ID, but other queries are difficult. Since aggregates are stored as append-only lists of immutable events, querying the data using SQL, as we used to, is impossible. To find an aggregate by some field, we need to first read all the events and replay them to restore all the aggregates.
To bring back all the querying power a relational database has to offer, we can create a dedicated read model derived from the event stream.
The event stream is the write model and the primary source of truth.
The read model is a "denormalized" view of the write model, allowing faster and more convenient querying. Read models are projections of the system state. Therefore, read models are also known as projections.
Projections provide a view of data for a single aggregate type or perform aggregations and combine data from multiple aggregate types.
That's where CQRS comes in handy.
CQRS
Command-query responsibility segregation (CQRS) stands for segregating the responsibility between commands (write requests) and queries (read requests). The write requests and the read requests are processed by different handlers.
A command generates zero or more events or results in an error.
CQRS is a self-sufficient architectural pattern and doesn't require event sourcing. But in practice, event sourcing is usually used in conjunction with CQRS. Event store is used as a write database, and SQL or NoSQL database as a read database.
Event handlers
Commands generate events. Event processing is done by event handles. As a part of event processing, we may need to update projections, send a message to a message broker, or make an API call.
There are two types of event handles: synchronous and asynchronous.
Storing the write model and read model in the same database allows for transactional updates of the read model. Each time we append a new event, the projection is updated synchronously in the same transaction. The projection is consistent with the event stream.
When an event handler communicates with an external system or middleware (e.g., sends a message to Kafka), it should run asynchronously after the transaction updating the write model. Asynchronous execution leads to eventual consistency.
Communication with external systems should not occur in the same transaction updating the write model. The external system call may succeed, but the transaction will later be rolled back, resulting in an inconsistency.
Anyway, distributed systems should be designed with eventual consistency in mind.
Domain events vs Integration events
Events in event sourcing are domain events. The domain event is a part of a bounded context and should not be used "as-is" for integration with other bounded contexts.
For communication between bounded contexts integration events are used. The integration event represents the current state of an aggregate, not just changes to the aggregate as a domain event.
Advantages of CQRS
- Independent scaling of the read and write databases.
- Optimized data schema for the read database (e.g. the read databases can be denormalized).
- Simpler queries (e.g. complex
JOIN
operations can be avoided).
Advantages of event sourcing
- A true history of the system (audit and traceability). An industry standard for implementing audit trail.
- Ability to put the system in any prior state (e.g. for debugging).
- New read-side projections can be created as needed (later) from events. It allows responding to future needs and new requirements.
Solution architecture
PostgreSQL can be used as an event store. It will natively support appending events, concurrency control and reading events. Subscribing on events requires additional implementation.
Component diagram
ER diagram
Events are stored in the ES_EVENT
table.
Optimistic concurrency control
Latest aggregate version is stored in the ES_AGGREGATE
table.
Version checking is used for optimistic concurrency control.
Version checking uses version numbers to detect conflicting updates (and to prevent lost updates).
Appending an event operation consists of 2 SQL statements in a single transaction:
- check the actual and expected version match and increment the version
UPDATE ES_AGGREGATE SET VERSION = :newVersion WHERE ID = :aggregateId AND VERSION = :expectedVersion
- insert new event
INSERT INTO ES_EVENT (TRANSACTION_ID, AGGREGATE_ID, VERSION, EVENT_TYPE, JSON_DATA) VALUES(pg_current_xact_id(), :aggregateId, :version, :eventType, :jsonObj::json) RETURNING ID, TRANSACTION_ID::text, EVENT_TYPE, JSON_DATA
pg_current_xact_id()
returns the current transaction's ID. The need for this will be explained later.
Snapshotting
On every nth event insert an aggregate state (snapshot) to the ES_AGGREGATE_SNAPSHOT
table specifying the version
INSERT INTO ES_AGGREGATE_SNAPSHOT (AGGREGATE_ID, VERSION, JSON_DATA)
VALUES (:aggregateId, :version, :jsonObj::json)
Snapshotting for an aggregate type can be disabled and configured in the application.yml
event-sourcing:
snapshotting:
# com.example.eventsourcing.domain.AggregateType
ORDER:
enabled: true
# Create a snapshot on every nth event
nth-event: 10
Loading any revision of the aggregate
To restore any revision of the aggregate:
- first read the latest value of the snapshot
SELECT a.AGGREGATE_TYPE, s.JSON_DATA FROM ES_AGGREGATE_SNAPSHOT s JOIN ES_AGGREGATE a ON a.ID = s.AGGREGATE_ID WHERE s.AGGREGATE_ID = :aggregateId AND (:version IS NULL OR s.VERSION <= :version) ORDER BY s.VERSION DESC LIMIT 1
- then read forward from the event stream from the revision the snapshot points to
SELECT ID, TRANSACTION_ID::text, EVENT_TYPE, JSON_DATA FROM ES_EVENT WHERE AGGREGATE_ID = :aggregateId AND (:fromVersion IS NULL OR VERSION > :fromVersion) AND (:toVersion IS NULL OR VERSION <= :toVersion) ORDER BY VERSION ASC
Synchronously updating projections
Using PostgreSQL as an event store and a read database allows for transactional updates of the read model. Each time we append a new event, the projection is updated synchronously in the same transaction. It's a big advantage because sometimes consistency is not so easy to achieve.
You can't get consistent projections when a separate database is used as an event store.
Asynchronously sending integration events to a message broker
Integration events should be sent asynchronously after the transaction updating the write model.
PostgreSQL doesn't allow subscribing on changes, so the solution is a Transactional Outbox pattern. A service that uses a database inserts events into an outbox table as part of the local transaction. A separate process publishes the events inserted into database to a message broker.
We may have multiple asynchronous event handlers or so-called subscriptions.
The subscription concept is required to keep track of delivered events separately for different event handlers.
The last event processed by the event handler (subscription) is stored in the separate table ES_EVENT_SUBSCRIPTION
.
New events are processed by polling the ES_EVENT
table.
Since multiple backend instances can run in parallel, we need to ensure that any processing only affects the event once. We don't want more than one event handler instance to handle the same event.
This is achieved by acquiring locks on the rows of the ES_EVENT_SUBSCRIPTION
table.
We lock the row (SELECT FOR UPDATE
) of the currently processed subscription.
To not hang other backend instances, we want to skip already locked rows (SELECT FOR UPDATE SKIP LOCKED
)
and lock the "next" subscription.
It allows multiple backend instances to select different subscriptions that do not overlap.
This way, we improve availability and scalability.
The event subscription processor polls ES_EVENT_SUBSCRIPTION
table every second (interval is configurable) for new events
and processes them:
- read the last transaction ID and event ID processed by the subscription and acquire lock
SELECT LAST_TRANSACTION_ID::text, LAST_EVENT_ID FROM ES_EVENT_SUBSCRIPTION WHERE SUBSCRIPTION_NAME = :subscriptionName FOR UPDATE SKIP LOCKED
- read new events
A comparison like
SELECT e.ID, e.TRANSACTION_ID::text, e.EVENT_TYPE, e.JSON_DATA FROM ES_EVENT e JOIN ES_AGGREGATE a on a.ID = e.AGGREGATE_ID WHERE a.AGGREGATE_TYPE = :aggregateType AND (e.TRANSACTION_ID, e.ID) > (:lastProcessedTransactionId::xid8, :lastProcessedEventId) AND e.TRANSACTION_ID < pg_snapshot_xmin(pg_current_snapshot()) ORDER BY e.TRANSACTION_ID ASC, e.ID ASC
(a, b) > (c, d)
is a row comparison and is equivalent toa > c OR (a = c AND b > d)
. - update the last transaction ID and event ID processed by the subscription
UPDATE ES_EVENT_SUBSCRIPTION SET LAST_TRANSACTION_ID = :lastProcessedTransactionId::xid8, LAST_EVENT_ID = :lastProcessedEventId WHERE SUBSCRIPTION_NAME = :subscriptionName
Reliable transactional outbox with PostgreSQL
Using only the event ID to track events processed by the subscription is unreliable and can result in lost events.
The ID
column of the ES_EVENT
table is of type BIGSERIAL
.
It's a notational convenience for creating ID columns having their default values assigned from a SEQUENCE
generator.
PostgreSQL sequences can't be rolled back.
SELECT nextval('ES_EVENT_ID_SEQ')
increments and returns the sequence value.
Even if the transaction is not yet committed, the new sequence value becomes visible to other transactions.
If transaction #2 started after transaction #1 but committed first, the event subscription processor can read the events created by transaction #2, update the last processed event ID, and thus lose the events created by transaction #1.
We use transaction ID with event ID to build a reliable PostgreSQL polling mechanism that doesn't lose events.
Each event is supplemented with the current transaction ID.
pg_current_xact_id()
returns the current transaction's ID of type xid8
.
xid8
values increase strictly monotonically and cannot be reused in the lifetime of a database cluster.
The latest event that is "safe" to process is right before the xmin
of the current snapshot.
pg_current_snapshot()
returns a current snapshot, a data structure showing which transaction IDs are now in-progress.
pg_snapshot_xmin(pg_snapshot)
returns the xmin
of a snapshot.
xmin
is the lowest transaction ID that was still active.
All transaction IDs less than xmin
are either committed and visible, or rolled back.
Even if transaction #2 started after transaction #1 and committed first, the events it created won't be read by the event subscription processor until transaction #1 is committed.
Database polling
To get new events from the ES_EVENT
table, the application has to poll the database.
The shorter the polling period, the shorter the delay between persisting a new event and processing it by the subscription.
But the lag is inevitable. If the polling period is 1 second, then the lag is at most 1 second.
The polling mechanism implementation ScheduledEventSubscriptionProcessor uses a Spring annotation @Scheduled to poll database with a fixed period.
The polling event subscription processing can be enabled and configured in the application.yml
event-sourcing:
subscriptions: polling # Enable database polling subscription processing
polling-subscriptions:
polling-initial-delay: PT1S
polling-interval: PT1S
Database polling alternative
To reduce the lag associated with database polling, the polling period can be set to a very low value, such as 1 second. But this means that there will be 3600 database queries per hour and 86400 per day, even if there are no new events.
PostgreSQL LISTEN
and NOTIFY
feature can be used instead of polling.
This mechanism allows for sending asynchronous notifications across database connections.
Notifications are not sent directly from the application,
but via the database trigger on a table.
To use this functionality an unshared PgConnection
which remains open is required.
The long-lived dedicated JDBC Connection
for receiving notifications has to be created using the DriverManager
API,
instead of getting from a pooled DataSource
.
PostgreSQL JDBC driver can't receive asynchronous notifications
and must poll the backend to check if any notifications were issued.
A timeout can be given to the poll function getNotifications(int timeoutMillis)
,
but then the execution of statements from other threads will block.
When timeoutMillis
= 0, blocks forever or until at least one notification has been received.
It means that notification is delivered almost immediately, without a lag.
If more than one notification is about to be received, these will be returned in one batch.
This solution significantly reduces the number of issued queries and also solves the lag problem that the polling solution suffers from.
The Listen/Notify mechanism implementation PostgresChannelEventSubscriptionProcessor is inspired by the Spring Integration class PostgresChannelMessageTableSubscriber.
The Listen/Notify event subscription processing can be enabled in the application.yml
event-sourcing:
subscriptions: postgres-channel # Enable Listen/Notify event subscription processing
This mechanism is used by default as more efficient.
Adding new asynchronous event handlers
After restarting the backend, existing subscriptions will only process new events after the last processed event and not everything from the first one.
New subscriptions (event handlers) in the first poll will read and process all events. Be careful, if there are too many events, they may take a long time to process.
Drawbacks
Using PostgreSQL as an event store has a lot of advantages, but there are also drawbacks.
- Asynchronous event handlers can process the same event more than once. It might crash after processing an event but before recording the fact that it has done so. When it restarts, it will then process the same event again (e.g., send an integration event). Integration events are delivered with at-least-once delivery guarantee. The exactly-once delivery guarantee is hard to achieve due to a dual-write. A dual-write describes a situation when you need to atomically update the database and publish messages without two-phase commit (2PC). Consumers of integration events should be idempotent and filter duplicates and unordered events.
- The asynchronous event handling results in the eventual consistency between the write model and sent integration events. The polling database table for new events with a fixed delay introduces a full consistency lag greater than or equal to the interval between polls (1 second by default).
- A long-running transaction in the same database will effectively "pause" all event handlers.
pg_snapshot_xmin(pg_snapshot)
will return the ID of this long-running transaction and events created by all later transactions will be read by the event subscription processor only after this long-running transaction is committed.
Class diagrams
This reference implementation can be easily extended to comply with your domain model.
Class diagram of the domain model
Class diagram of the projections
Class diagram of the service layer
How to run the sample?
-
Download & install SDKMAN!.
-
Install JDK 17
sdk list java sdk install java 17.0.x-tem
-
Install Docker and Docker Compose.
-
Build Java project and Docker image
./gradlew clean build bootBuildImage -i
-
Run PostgreSQL, Kafka and event-sourcing-app
docker compose up -d --scale event-sourcing-app=2 # wait a few minutes
-
Follow the logs of the application
docker compose logs -f event-sourcing-app
-
Run E2E tests and see the output
E2E_TESTING=true ./gradlew clean test -i
-
Explore the database using the Adminer database management tool at http://localhost:8181. Find the database name, user, and password in the docker-compose.yml.
You can also manually call the REST API endpoints.
-
sudo apt install curl jq
-
Place new order
ORDER_ID=$(curl -s -X POST http://localhost:8080/orders -d '{"riderId":"63770803-38f4-4594-aec2-4c74918f7165","price":"123.45","route":[{"address":"Kyiv, 17A Polyarna Street","lat":50.51980052414157,"lon":30.467197278948536},{"address":"Kyiv, 18V Novokostyantynivska Street","lat":50.48509161169076,"lon":30.485170724431292}]}' -H 'Content-Type: application/json' | jq -r .orderId)
-
Get the placed order
curl -s -X GET http://localhost:8080/orders/$ORDER_ID | jq
-
Accept the order
curl -s -X PUT http://localhost:8080/orders/$ORDER_ID -d '{"status":"ACCEPTED","driverId":"2c068a1a-9263-433f-a70b-067d51b98378"}' -H 'Content-Type: application/json'
-
Get the accepted order
curl -s -X GET http://localhost:8080/orders/$ORDER_ID | jq
-
Complete the order
curl -s -X PUT http://localhost:8080/orders/$ORDER_ID -d '{"status":"COMPLETED"}' -H 'Content-Type: application/json'
-
Get the completed order
curl -s -X GET http://localhost:8080/orders/$ORDER_ID | jq
-
Try to cancel a completed order to simulate business rule violation
curl -s -X PUT http://localhost:8080/orders/$ORDER_ID -d '{"status":"CANCELLED"}' -H 'Content-Type: application/json' | jq
-
Print integration events
docker compose exec kafka /bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic order-events --from-beginning --property print.key=true --timeout-ms 10000