WordPress/openverse-catalog

Trigger TSV loader as a subDAG or DAG run

AetherUnbound opened this issue · 7 comments

Current Situation

Currently the TSV loader script runs every minute. For a vast majority of its runs (at least at the moment), it skips since there are no files available. Out of the 35,300+ runs we have so far, only 8 have ever run the entire workflow.

Suggested Improvement

We should instead trigger the TSV loading as part of the provider DAGs. This can be done with either the TriggerDagRunOperator1 or via SubDAGs2.

Benefit

Both of these strategies would reduce the number of runs this DAG completes significantly. This would make it easier to navigate runs in the UI, track failures, etc.

Additional context

We should also probably clear out the table of skipped runs at some point. They don't really provide us with much useful information.

Implementation

  • 🙋 I would be interested in implementing this feature.

Footnotes

  1. https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/trigger_dagrun/index.html

  2. https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#subdags

I have some work in progress on feature/tsv-loader-improvements#285 which adds loading steps per media type to the end of provider workflows. One issue here though is that it requires passing the output TSV(s) via XCom. Previously, the TSV loader would just pick up the least recently edited TSV regardless of media type and process it. In order to tie the loading directly into the provider workflows, the loading steps need to know which TSV to process and what media type it is. This is further complicated with providers like Wikimedia Commons which produce both image and audio TSVs.

One advantage the current system has is that it can process TSVs even if the provider script fails at some point. We'll definitely want to retain this functionality going forward. As such, we can't wait until the provider script completes to pass the produced TSV into the XComs. Although we want to change the way the provider scripts are structured in #229, for now we can potentially put a shim in between the <provider>.main function and the workflow creation DAG. The output directory can be pulled from the module's media store (available at the module level) before actual provider processing and be immediately pushed into XComs. Down the line, I think we'll want to invert this process and have the shim create the output filename and pass it down to the media store, rather than pull it out once the run has started. The alternative to all this is to update every provider script to have a yield of some sort at the beginning to report the output TSV filename, which seems like more work than desired at this juncture.

One advantage the current system has is that it can process TSVs even if the provider script fails at some point. We'll definitely want to retain this functionality going forward.

For context, this is the main reason it was set up that way. While I agree that it's not great to have so many unused runs, I think it's pretty important do decouple pulling the data from the internet from putting the data into the DB. We were having big problems with data getting pulled down to disk, then left there because some provider was having an availability hiccup, or because our own DB was unavailable for some reason or another. A couple of times this resulted in the instance that did the script running going down due to lack of disk space.

That's incredibly helpful context, thank you! I think there are a few ways to ensure that the scripts will still be loaded even if those hiccups are encountered. Retries & Airflow's trigger rules will be key for making the loading fault tolerant.

Note to myself - the date used within the output filename should really just be the DAG's execution date. That would prevent us from having to pass it through XComs. I'll stay with the current implementation now because I don't want to change too much, but I'll probably have a follow up ticket for simplifying that.

Another few notes:

  1. We probably want a slack message notification once a provider script has completed successfully.
  2. Retries should be set to 0 for the data pull steps so that if a provider fails, we want it to 1) stop trying so we can investigate and don't immediately run into the same issue and 2) ingest what we've been able to pull up to that point.
  3. How do we want to handle no records pulled? It seems that if we aren't pulling records something has gone wrong upstream, so the loading tasks should just skip rather than error out as well.

How do we want to handle no records pulled?

Could this scenario also occur for date-based DAGs that don't have any new data?

Could this scenario also occur for date-based DAGs that don't have any new data?

Absolutely, that's a great point! Gives more credence to the "skip if no data" direction.