WordPress/openverse

Update dated DAGs to allow for backfilling data

Closed this issue · 8 comments

Problem

A dated provider DAG is one whose main function for the pull_data task accepts a date parameter representing the date for which data should be ingested. An example is Wikimedia Commons . Generally, a dated DAG runs one day's worth of data, and therefore it makes sense that these DAGs should be on the @daily schedule.* By default the date passed in is today's date, optionally shifted by a given day_shift (so day_shift=1 would run for yesterday's data).

The problem is when a dated DAG is turned off/doesn't run for a period of time, there is currently no easy way to backfill the data for the missed days. Airflow catchup allows us to run all of the missed tasks: so for example, if a @daily DAG was turned off for five days, when we turn it back on it will be run 6 times (once for today and once for each missed run). But with the current setup, all 6 runs will ingest data for today's date.

* Where this isn't true, we should probably fix it. #1643 for example will update Wikimedia Commons to run daily.

Description

We should instead use Airflow's {{ execution_date }}, which references the date of the scheduled DAG run -- meaning that when the catchup DAG runs kick off, they will ingest data for the correct date!

Implementation

  • 🙋 I would be interested in implementing this feature.

Upon further research I think we should test this to verify if it's true. It looks like execution_date was actually deprecated and ds, which we're currently using, should provide the DAG run's logical date: https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html. Having updated Airflow in WordPress/openverse-catalog#372, I think this might actually be working already.

We should keep this open to test and verify at the very least, since it's critical that we're able to backfill that data!

Thanks for drafting this! Admittedly, it's very possible that I may have been misinterpreting the current setup and that it's behaving as expected now. Either way, it's a good opportunity to verify that and adjust if needed!

Thinking about this again - what would be really cool is to use execution_date and next_execution_date (or prev_execution_date) as the date bounds! That way we don't even have to specify a day_shift ourselves, the Airflow execution window gives us our necessary boundaries. That would allow us to change the schedule interval without having to adjust the day_shift or anything else in order to operate dated DAGs over a previous interval!

Is this issue the best place to discuss backfill for providers in general? I was just thinking about audio's beta status, and increasing the number of audio results would be a nice way to contribute to promoting audio as fully-supported.

I think once we address the mechanism for backfilling data, we should definitely have a tracking ticket for setting up backfills on all providers that support it!

I've been thinking about how to approach this:

First for context, the dated DAGs for consideration are: Europeana, Flickr, Metropolitan, Phylopic, and Wikimedia Commons. Of these, Europeana and Flickr are currently turned off.

Just turning catchup=True on works (with some things to consider)

I tested with Wikimedia Commons and confirmed that if we turn catchup=True on the DAG, backfill will work. By default, this means Airflow schedules a run for all of the missed dates, starting at the start_date. So it first runs a DagRun for 1/1/1970, then 1/2/1970, and so on. This means if we turn catchup on, the DAG will be very busy backfilling data for the last 50 years before it gets around to ingesting new data from today. Some thoughts:

  • Airflow has an option to run the backfill “backwards”, meaning it would run for today's date first, then yesterday, and so on.
  • We can adjust the start_date to reduce the backfill window

Another potential problem is that once catchup=True is on, the backfill will automatically restart if we ever lose the DagRun history (since that’s how Airflow determines whether a backfill is needed). Recently we renamed the statenmuseum DAG to smk, which caused loss of this history, so this is a real possibility.

We could consider using environment variables to manually turn catchup True/False for dated DAGs. For non dated DAGs, we should hard code catchup to False as it will never be necessary.

Running the Airflow backfill command

Airflow has a backfill CLI command that can be used to re-run DAGs on a set schedule manually (From this article). This could be useful for a one-time backfill.

Leveraging the ingestion_workflows

Europeana, Flickr, and Wikimedia Commons already have ingestion_workflow DAGs defined in addition to the “regular” provider DAG. (Metropolitan and Phylopic do not, but they're trivial to generate with the factory.) These are daily DAGs that generate a list of days, generally weighted toward more recent days, and then runs just the pull_data step for that day.

The purpose of these DAGs was to reingest/update old data in a smart way, with an emphasis on updating more recent data first. I believe the intention was to update popularity data and such, but if we could get these DAGs working, they would also naturally backfill data over time.

The workflows only run the pull_data step; tsvs are generated but the data is not loaded into the DB. So we’d need to update the workflow to make sure the data is loaded. But as an additional benefit, we would be able to run these independently of the normal ingestion process and so would not interrupt typical ingestion.

I think we could tackle this in two steps:

  1. Turn catchup=True and adjust the start date to the beginning of this year, or whenever our first actual recorded run of the DAG is, or maybe something else entirely, up for discussion! I don't think we should leave it as 1/1/1970 though 😅 This would be useful since it would give us the runs within the DAG we're already using and make the run data consistent with all our other runs.
  2. With that in place, we could then work on getting the reingestion workflows back up and running, for all the advantages you list.

I only recommend going with that first step because it's fairly straightforward and unblocks the backfilling while we work on the second step. I have no idea how hairy or difficult step 2 will be, so having something running while we look into it seems ideal!

My only hesitation was that it will result in pausing ingestion of more current data until the backfill is complete -- but I think that's actually not important since (a) we'll still be ingesting lots of new data and (b) the data refresh isn't turned on at the moment so it's a moot point 😄

As to working on the reingestion workflows, I prefer them for the long term solution in part because we need to get them working anyway. The simplest approach would be to just update it to trigger a DagRun of the normal provider workflow for each reingestion date, complete with the load_data step. The big thing to figure out will be concurrency problems 😱

In the meantime, I'll try to come up with some more reasonable start_dates!