A reference repository for implementing parallel ops in Dagster that support sequential chaining.
🔍 You will find all these job definitions in the src/jobs.py
file.
The standard static job is what you may typically build in Dagster. If you were developing an ETL pipeline for a large raw dataset for example, the job would perform the full extraction of data first, then the various transforms, and finally load to whichever destination data store you need the data to be. You'd have a Gantt chart that looks like this:
However, this can be really slow especially if you're running a lot of expensive transforms on a huge amount of data, since each job is iterating through the data sequentially. As mentioned in the Dagster documentation, even if a job can be parallelized internally, if something goes wrong, the whole thing needs to start over.
With Dynamic Graphs, we can achieve parallelization of jobs on batches of the data, instead of having to iterate through the entire chunk one at a time, using DynamicOut
and map
.
This is better than the static job in terms of performance, but only if your ETL pipeline doesn't require that your data be transformed sequentially.
However, as you'll notice in the Gantt chart above, there's another problem: all the cloned/mapped ops of one op all get executed first before moving on to the next set of ops. This can be an issue if memory is also a concern wherever Dagster is deployed, because this set-up will still retrieve all of the data first before executing the subsequent transform and load steps.
Using Dagster's dagster/priority
tag, we can tell Dagster to immediately run the next op in the sequence by increasing the priority of the ops that come later. Looking at the Gantt chart below, a group of batches (up to how many concurrent processes your machine can handle, or how much is specified using the max_concurrent
) are executed from end-to-end first before moving to the next group of batches.
With this, you have control over how many concurrent ops can run at the same time, and the pipeline will have the opportunity to clean up data from batches that have been fully processed already.
Data was downloaded from "The Complete Pokemon Dataset" in Kaggle:
https://www.kaggle.com/datasets/rounakbanik/pokemon
The CSV has also been provided in the ./data
directory.
Make sure you have Python 3.11 installed in your system, and that you have the Pokemon CSV in ./data
, then run:
make init
make setup
make run
You should be able to access Dagit via http://localhost:3000
.