MrPowers/mack

type_2_scd_generic_upsert does not handle NULL values properly

Opened this issue · 7 comments

The current way the type_2_scd_generic_upsert function checks for changes in a row involves evaluating each column and this does not yield the expected result. An possible perfomance improvement on this matter and cater for NULL values is to add a hash column that is calculated based on the contents of the columns in the table (except the scd2 system columns).

some background: https://datacadamia.com/dit/owb/scd2_hash

besides some code around it the most interesting changes would be
an udf to calculate the hash

differential_hash_udf = udf(
        lambda row: calculate_differential_hash(row, attr_col_names), StringType()
    )

addition of the hash column to the update set

updates_df = updates_df.withColumn(
        differential_hash_col_name, differential_hash_udf(struct(*attr_col_names))
    )

stage changes based on the hash of the columns instead of a column by column comparison

 staged_part_1 = (
        updates_df.alias("updates")
        .join(delta_table.toDF().alias("base"), primary_key)
        .where(
            f"base.{is_current_col_name} = true AND base.{differential_hash_col_name} <> updates.{differential_hash_col_name}"
        )
        .selectExpr("NULL as mergeKey", "updates.*")
    )

merge using the hash column

res = (
        delta_table.alias("base")
        .merge(
            source=staged_updates.alias("staged_updates"),
            condition=pyspark.sql.functions.expr(f"base.{primary_key} = mergeKey"),
        )
        .whenMatchedUpdate(
            condition=f"base.{is_current_col_name} = true AND base.{differential_hash_col_name} <> staged_updates.{differential_hash_col_name}",
            set={
                is_current_col_name: "false",
                end_time_col_name: f"staged_updates.{effective_time_col_name}",
            },
        )
        .whenNotMatchedInsert(values=res_thing)
        .execute()
    )

this is a breaking (not backwards compatible) change to what is now in the function so you could consider making this a different function?

this is the other issue mentioned in #120

@dgcaron thank you for flagging this. I am not so familiar with the topic, could you explain the potential performance boost and how this will fix null value issues?

i believe the issue arises here https://github.com/MrPowers/mack/blob/main/mack/__init__.py#L103

 staged_updates_attrs = list(
        map(lambda attr: f"staged_updates.{attr} <> base.{attr}", attr_col_names)
    )

if the base.{attr} value is initially null for the first write, the scd2 isn't being built properly on next writes. this has to do with the way sql and pypark handle null values in comparisons.

the perfomance boost should come from the fact that you join based on a precalculated hash (the hash is also persisted in the scd2 table) on both sides instead on all columns of interest. you don't have to evaluate each column on both sides to check for changes this way.

i'll add a test case that shows the issue in the upcoming days

@dgcaron understood. precalculating a hash column and comparing only that is way less expensive than comparing each column and it would solve the spark null problem, although this might be solvable through eqNullSafe (not sure though)

yes, i guess eqNullSafe should solve the null issue too but i am not sure how to implement that properly with an expression string. it would allow for a change that is non-breaking though, if that is the preference, than i can take a look some time soon

@dgcaron Maybe we could do that as a first step. We could still implement a scd2 with hash later :)

@dgcaron - just a friendly ping on this one. Would love to get this fix added! Thank you!