MarquezProject/marquez

Handling of input datasets where schema different from current version

davidjgoss opened this issue · 1 comments

Sometimes when a dataset is declared as an input, and we've seen it before as an output, the schema in the incoming event might be differ to that of the current dataset version. There are a few reasons why this might happen:

  • The schema genuinely has changed between runs, because some DDL change happened outside our OpenLineage-instrumented jobs
  • We have a different level of metadata fidelity in events for input datasets vs output datasets

This test illustrates how Marquez handles this today:

When reading a dataset, even when reporting a schema that differs from the prior written schema, the dataset version doesn't change.

So we effectively amend the current dataset version to have the union of its existing fields plus anything new from the incoming event. This totally makes sense, although it's a bit weird in that the dataset version is sort of supposed to be immutable - the fields contribute to its UUID at creation time - and we're mutating it.

This has bubbled up now because in #2763 I'm adding the dataset schema version concept and looking at how to handle this scenario with input datasets. In this case we would definitely not want to mutate the dataset schema version - it is inherently an immutable set of fields, and could be referenced by any number of dataset versions.

There are a few options, I think:

  1. Compute and upsert a new dataset schema version from the union of existing fields plus anything new from the incoming event, then set that changed version on dataset_versions.dataset_schema_version_uuid - this would be the most consistent with current behaviour, without derailing the schema version concept, but it will make the code a bit fussy and difficult to "push down" into the database as we are looking at doing for performance reasons
  2. Use the differing schema as signal that is a different version, and create a new dataset version accordingly, including the new schema - sort of equivalent to preceding the run/job event with a DatasetEvent, I guess. This would make things simpler and make a dataset version genuinely immutable, but would also be a breaking change and have implications for the lineage graph
  3. Treat this as an error and reject the OpenLineage event - putting this here for completeness but I doubt it's a serious option

cc @wslulciuc

There is also some slightly odd behaviour I noticed that's probably related to this...

If you run a job that loads 2 columns from a source dataset to a target dataset, you get a lineage graph like:
image

and a column lineage graph like:
image

However, if you run the same job, but it now loads an additional column from the source dataset (address), things seemingly become inconsistent... The lineage graph looks like this:
image

You can see the new column in the output dataset, but not on the input dataset. However, the column lineage graph looks like:
image

Where you can see the new column. In addition, if you get the dataset from the API (/api/v1/namespaces/default/datasets/source), you can see the new column:
image

I'm not sure why you can see the new column in the column lineage graph and the dataset returned from /api/v1/namespaces/default/datasets/source, but not in the lineage graph.