/dagster-playground

Data Engineering Stack with Dagster, Trino, Nessie and Superset

Primary LanguagePLpgSQL

Dagster Playground

Table of Contents

overview

Setup

This project uses Rye to manage Python versions and dependencies.

After Installation | Rye, run the following to download dependencies and start Dagster:

rye sync

# aliased to 'dagster dev' - see pyproject.toml
rye run dev 

Dagster Project

Our Dagster project lives in quickstart_etl/, which is read during dagster dev. The entry point is quickstart_etl/__init__.py which contains all high-level defintions.

Each file in here has comments intended to help understand the file and its contents purpose.

Assets

For more infomation see: [Software Defined Assets | GitHub]

In Dagster assets are descriptions of individual pieces of data, how to create/source them and any metadata (dependencies, refresh policies).

In quickstart_etl/assets/ there are two different domains with differing load sources. pagila is fetched from our PostgreSQL database, and uses DBT to refine the models, while iris is a sourced from a Dagster example CSV dataset.

As you'll notice from these, assets can be a single file, or organised into their own module - See [Python Practices] for more info.

Resources

This project primarily uses DuckDB and a Data Lakehouse (Trino, Parquet & Hive) to demonstrate the latest data storage options, and when to best use each.

DuckDB is fantastic for local development, simple to grasp and allows you to quickly move around a lightweight data store, however it doesn't scale particularly well.

A lakehouse trades the ease of use for an extremely high scaling potential, see Trino and Hive Metastore below, and Why Lakehouse, Why Now? | Dremio for more info.

In order to make use of a Lakehouse style architecture, this project contains a custom Trino IO Manager, see quickstart_etl/trino_io_manager/README.md for more info.

The Parquet and Snowflake resources are cloned from the Dagster example project: project_fully_featured | GitHub as references of simple IO Managers.

Docker Compose & Tech Stack

This project brings up various containers to form it's "data platform".

Should you need to interact with these containers individually you can run docker compose <action> <service name>, such as:

# Open Trino Shell
docker compose exec -it trino trino

# Tail Metabase logs
docker compose logs -f metabase

This can also be used to bring up individual containers. For example to just run Trino (and it's dependencies - like Minio) run the following: docker compose up trino. This can be useful when making configuration changes and wanting to see quick feedback.

Respective URLs for UIs:

Trino

Trino is a distributed SQL query engine designed to efficiently query vast amounts of data, usually against flat files like parquet. It is also the query engine behind AWS Athena.

With it being separate from the actual data storage, it can be scaled, deployed and managed independently for any storage or visualisation components.

It's container is purely the querying aspect of our Lakehouse, while storage and table metadata is handled by Minio (Parquet) and Nessie (Iceberg). See Iceberg Connector | Trino for more information.

For clarity, querying does include CRUD operations and isn't limited to SELECTs.

Hive Metastore

TODO

Metabase

Metabase is a BI / Analytics platform for Data with a built-in query runner and various data visualisations.

We use a custom Dockerfile due to Error loading shared library with DuckDB driver | GitHub, for more info see docker/metabase/Dockerfile: ./docker/metabase/Dockerfile.

On start-up the DuckDB file should be available under /duckdb/database.duckdb, which you can configure in Metabase's UI under Add Database | localhost:5000.

NB: On first access, Metabase will prompt you for a user, which can be fake data. As the Metabase DB is in a volume, you won't be prompted again.

Superset

The Superset setup here is simplied for local testing, for a more adequate and updated setup, check out apache/superset | GitHub

Python Practices

File & Module Composition

Single files should be preferred to modules, like how quickstart_etl/assets/iris_csv.py is one file, in comparison to the Pagila Assets module: quickstart_etl/assets/pagila.

When creating modules, __init__.py is normally a good starting point for containing everything. However, it can quickly grow, decreasing readability.

To remedy this, we can split __init__.py into multiple files, named after their purpose. For example, quickstart_etl/assets/pagila has a file solely for DBT assets, and one for fetching PostgreSQL data.

Chunked Loading SQL Dataframe

When fetching large tables from a database, it's fairly easy to overload your RAM. The following are ways to avoid this: (ordered least to most intrusive)

  1. Instruct Pandas to load tables in chunks:

    def customer(psql: PagilaDatabase) -> pd.DataFrame:
        return pd.concat(
            pd.read_sql(
                "my_table", 
                psql.connection(), 
                chunksize=10000
            ), # returns Iterator[pd.DataFrame]
            ignore_index=True
        )
  2. Another technique is using pyarrow as a types backend rather than numpy. [PyArrow | Arrow Docs] exposes bindings from the C++ Apache Arrow library, these static types use less bytes than the dynamic fashion of Python and NumPy:

    def customer(psql: PagilaDatabase) -> pd.DataFrame:
        return pd.read_sql("my_table", psql.connection(), dtype_backend="pyarrow")

    This difference can be seen by running df.memory_usage(deep=True).sum() / 1024**2 to fetch the MB usage of a DataFrame, and df.memory_usage(deep=True) to display the bytes taken by each field.

  3. For extremely wide tables (many columns), restricting the DataFrame to specific columns can have a major impact:

    def customer(psql: PagilaDatabase) -> pd.DataFrame:
        return pd.read_sql(
            "my_table", 
            psql.connection(), 
            columns=["id", "forename", "surname", "dob"],
            dtype_backend="pyarrow"
        )
    

References