Note: This project is not ready for production yet, but it will be very soon. If you're interested in contributing, send us a note at pgevents@fylehq.com
This is a utility to generate table events from Postgres using logical replication and push it to RabbitMQ.
What are you dithering about, you ask?
Imagine you have a table in Postgres DB. You perform some inserts, updates or deletes - either via an application or maybe via SQL scripts. This utility will stream a set of logical Insert, Update or Delete events and put them in a RabbitMQ exchange.
For example, if you have a table like this:
create table users (
id int primary key,
full_name text not null
);
If you run the following SQL,
insert into users(id, full_name) values (1, 'Mikael Åkerfeldt');
You'll see a message in RabbitMQ exchange like this with routing key public.users
:
{
"table_name": "public.users",
"old": {},
"new": {
"id": 1,
"full_name": "Mikael Åkerfeldt"
},
"id": 1,
"diff": {
"id": 1,
"full_name": "Mikael Åkerfeldt"
},
"action": "I"
}
You can now run async code that works off this event. E.g. sending an email to a newly signed up user. The possibilities are endless.
You can read more about how logical replication is being used to raise events here
To try out pgevents, you'll need a running Postgres DB, RabbitMQ, a consumer that can read the events etc. To simplify this, we've provided a way to quickly see how things work using docker compose.
Just run all the services by:
docker compose up --build
Now, open up a terminal and connect to the DB using the following command:
PGPASSWORD=postgres psql -h localhost -U postgres dummy
Now execute the following SQL:
insert into users(id, full_name) values (1, 'Mikael Åkerfeldt');
Now go to the terminal where docker compose
is running and you'll see the following event being printed by the event_logger
process:
event_logger_1 | routing_key public.users
event_logger_1 | event Event <table_name: public.users>
event_logger_1 | event {'table_name': 'public.users', 'old': {}, 'new': {'id': 1, 'full_name': 'Mikael Åkerfeldt'}, 'id': 1, 'diff': {'id': 1, 'full_name': 'Mikael Åkerfeldt'}, 'action': 'I'}
You'll need to use Postgres DB >= 10.0.
You'll need to have the following entries in your postgres.conf file.
shared_preload_libraries = 'wal2json'
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 4 # max number of walsender processes (change requires restart)
max_replication_slots = 4 # max number of replication slots (change requires restart)
If you're using AWS RDS, then you'll need to do a few more things. This blog highlights the steps.
You'll need to run set replica identity to full for the tables in question. Example:
alter table users replica identity full;
If you don't do this, the old value will not be sent during updates and deletes and the diffs may be incorrect.
Set the following environment variables to connect to PostgreSQL >= 10 and to RabbitMQ as broker
export PGHOST=xxx
export PGPORT=5432
export PGDATABASE=test
export PGUSER=postgres
export PGPASSWORD=xxx
export PGSLOT=pgevents
export PGTABLES=public.users
export RABBITMQ_URL=yyy
export RABBITMQ_EXCHANGE=table_exchange
export RABBITMQ_QUEUE_NAME=audit
Now run the PGEvents producer process using the published docker image like this:
docker run -i -e PGHOST -e PGPORT -e PGDATABASE -e PGUSER -e PGPASSWORD -e PGSLOT -e RABBITMQ_URL -e RABBITMQ_EXCHANGE --rm \
fylehq/pgevents producer
To read data from rabbitmq exchange and print it to stdout
docker run -i -e RABBITMQ_URL -e RABBITMQ_EXCHANGE -e RABBITMQ_QUEUE_NAME --rm \
fylehq/pgevents consumer_debug
For detailed information, use the help flag
$ docker run -i -e PGHOST -e PGPORT -e PGDATABASE -e PGUSER -e PGPASSWORD -e PGSLOT --rm \
fylehq/pgevents producer --help
Usage: producer [OPTIONS]
Options:
--pghost TEXT Postgresql Host ($PGHOST) [required]
--pgport TEXT Postgresql Host ($PGPORT) [required]
--pgdatabase TEXT Postgresql Database ($PGDATABASE) [required]
--pguser TEXT Postgresql User ($PGUSER) [required]
--pgpassword TEXT Postgresql Password ($PGPASSWORD) [required]
--pgslot TEXT Postgresql Replication Slot Name ($PGSLOT)
[required]
--pgtables TEXT Restrict to specific tables e.g.
public.transactions,public.reports
--rabbitmq-url TEXT RabbitMQ url ($RABBITMQ_URL) [required]
--rabbitmq-exchange TEXT RabbitMQ exchange ($RABBITMQ_EXCHANGE) [required]
--help Show this message and exit.
To run PGEvents in production, you can simply use the published docker image that is available at:
docker.io/fylehq/pgevents:latest
Be sure to pass all the environment variables highlighted in the section above so that the producer knows how to connect to your Postgres DB and RabbitMQ.
Run the following commands:
# Start producer, database and
docker-compose up --build
# In a new terminal run the tests now
docker-compose run --rm --entrypoint='python -m pytest -vvv' producer
If you're interested in contributing to this project, do send us a note at pgevents@fylehq.com