This demo is a fork of the Dagster MDS example with the addition of an observable source asset, and downstream multi-asset sensors.
The final result is a project structure like this:
To get started:
pip install -e ".[dev]"
To keep things running on a single machine, we'll use a local postgres instance as both the source and the destination for our data. You can imagine the "source" database as some online transactional database, and the "destination" as a data warehouse (something like Snowflake).
To get a postgres instance with the required source and destination databases running on your machine, you can run:
$ docker pull postgres
$ docker run --name mds-demo -p 5432:5432 -e POSTGRES_PASSWORD=password -d postgres
$ PGPASSWORD=password psql -h localhost -p 5432 -U postgres -d postgres -c "CREATE DATABASE postgres_replica;"
Now, you'll want to get Airbyte running locally. The full instructions can be found here, but if you just want to run some commands (in a separate terminal):
$ git clone https://github.com/airbytehq/airbyte.git
$ cd airbyte
$ docker-compose up
Once you've done this, you should be able to go to http://localhost:8000, and see Airbyte's UI, the default username is airbyte
and the default password is password
.
Now, you'll want to seed some data into the empty database you just created.
There's a script provided that should handle this all for you, which you can run with:
$ python -m dagster_proj.utils.setup_postgres
This script will also create the sample users data data/users.csv.
This example uses Dagster to manage Airbyte ingestion as code.
To setup the Airbyte connections between the Postgres source and sink run:
dagster-airbyte apply --module dagster_proj.assets.airbyte_iaac:airbyte_reconciler
dagster dev
Open your browser to view the resulting assets. The job "sync_database" can be run to trigger the Airbyte sync and downstream run of the dbt model "orders_cleaned". This job is also set on a schedule to run daily on a cron schedule.
To trigger a run of the asset "users" and dbt model "users_cleaned" update the file data/users.csv. Updates to this file are tracked by the observable source asset "users_raw" and changes trigger the downstream dbt model "users_cleaned". The same pattern could be applied if you wanted to watch for new files in a directory or blob storage by modifying the code in the observable source asset.
When both "orders_cleaned" and "users_cleaned" have been updated the downstream model "summary" is run, along with the asset "bi_report".
The job to sync airbyte and run the "orders_cleaned" dbt model has a retry policy so the entire job can be retried in case of failure. The asset "bi_report" has a more specific retry policy so that it can be retried multiple times within one run if necessary.
The asset "summary" is run using a multi-asset sensor to track the upstream status of "orders_cleaned" and "users_cleaned".