update function type_2_scd_generic_upsert
Opened this issue · 4 comments
Hey,
In the upsert query instead of
baseTable.alias("base")
.merge(
source=stagedUpdates.alias("staged_updates"),
condition=pyspark.sql.functions.expr(
f"base.{primaryKey} = mergeKey AND base.{isCurrentColName} = true AND ({stagedUpdatesAttrs})"
),
)
.whenMatchedUpdate(
set={
isCurrentColName: "false",
endTimeColName: f"staged_updates.{effectiveTimeColName}",
}
)
.whenNotMatchedInsert(values=res_thing)
.execute()
you can make it as:
baseTable.alias("base")
.merge(
source=stagedUpdates.alias("staged_updates"),
condition=pyspark.sql.functions.expr(
f"base.{primaryKey} = mergeKey"
),
)
.whenMatchedUpdate(
condition = f"base.{isCurrentColName} = true AND ({stagedUpdatesAttrs})",
set={
isCurrentColName: "false",
endTimeColName: f"staged_updates.{startTimeColName}",
}
)
.whenNotMatchedInsert(values=insert_thing)
.execute()
This will prevent duplicates if I run the same function again and again with no change in base and updates table.
@Amrit-Hub - thanks for commenting. Can you please open a PR with your code? Can you also add tests to demonstrate why this implementation is better? Thank you!
@Amrit-Hub - just following up. Thank you.
is the commit refenced on apr 16 something that is considered to be merged as i run into the situation that is described there. in my situation i get a full data dump each day and not all records in the dump are updates / have actual changes, with the current version this will result in duplicate rows
edit:
i have been trying the proposed change and it seems the upsert doesn't handle undefined values very well either. if you would consider the change applicable i would like to create a PR for it that also handles the undefined values too.
@dgcaron it would be greatly appreaciated if you could contribute with a PR and tests to cover the scenario! :)